Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@
<artifactId>ydb-sdk-query</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-opentelemetry</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>tech.ydb.test</groupId>
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import io.grpc.Metadata;

import tech.ydb.core.impl.call.GrpcFlows;
import tech.ydb.core.tracing.NoopTracer;
import tech.ydb.core.tracing.Span;

/**
* @author Nikolay Perfilov
Expand All @@ -24,6 +26,7 @@ public class GrpcRequestSettings {
private final Consumer<Metadata> trailersHandler;
private final BooleanSupplier pessimizationHook;
private final GrpcFlowControl flowControl;
private final Span span;

private GrpcRequestSettings(Builder builder) {
this.deadlineAfter = builder.deadlineAfter;
Expand All @@ -36,6 +39,7 @@ private GrpcRequestSettings(Builder builder) {
this.trailersHandler = builder.trailersHandler;
this.pessimizationHook = builder.pessimizationHook;
this.flowControl = builder.flowControl;
this.span = builder.span;
}

public static Builder newBuilder() {
Expand Down Expand Up @@ -82,6 +86,10 @@ public GrpcFlowControl getFlowControl() {
return flowControl;
}

public Span getSpan() {
return span;
}

public static final class Builder {
private long deadlineAfter = 0L;
private boolean preferReadyChannel = false;
Expand All @@ -93,6 +101,7 @@ public static final class Builder {
private Consumer<Metadata> trailersHandler = null;
private BooleanSupplier pessimizationHook = null;
private GrpcFlowControl flowControl = GrpcFlows.SIMPLE_FLOW;
private Span span = NoopTracer;

/**
* Returns a new {@code Builder} with a deadline, based on the running Java Virtual Machine's
Expand Down Expand Up @@ -173,6 +182,11 @@ public Builder withPessimizationHook(BooleanSupplier pessimizationHook) {
return this;
}

public Builder withSpan(Span span) {
this.span = span;
return this;
}

public Builder disableDeadline() {
this.deadlineDisabled = true;
return this;
Expand Down
18 changes: 18 additions & 0 deletions core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import io.grpc.MethodDescriptor;

import tech.ydb.core.Result;
import tech.ydb.core.tracing.NoopTracer;
import tech.ydb.core.tracing.Tracer;
import tech.ydb.core.utils.URITools;


Expand Down Expand Up @@ -40,6 +42,22 @@ <ReqT, RespT> GrpcReadWriteStream<RespT, ReqT> readWriteStreamCall(

ScheduledExecutorService getScheduler();

default Tracer getTracer() {
return NoopTracer.getInstance();
}

default String getServerAddress() {
return "ydb.server";
}

default int getServerPort() {
return 2135;
}

default String getTransportScheme() {
return "grpc";
}

@Override
void close();

Expand Down
18 changes: 18 additions & 0 deletions core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import tech.ydb.core.impl.auth.GrpcAuthRpc;
import tech.ydb.core.impl.pool.ChannelFactoryLoader;
import tech.ydb.core.impl.pool.ManagedChannelFactory;
import tech.ydb.core.tracing.NoopTracer;
import tech.ydb.core.tracing.Tracer;
import tech.ydb.core.utils.Version;


Expand Down Expand Up @@ -86,6 +88,7 @@ public enum InitMode {
private boolean useDefaultGrpcResolver = false;
private GrpcCompression compression = GrpcCompression.NO_COMPRESSION;
private InitMode initMode = InitMode.SYNC;
private Tracer tracer = NoopTracer.getInstance();

/**
* can cause leaks https://github.com/grpc/grpc-java/issues/9340
Expand Down Expand Up @@ -189,6 +192,10 @@ public boolean useDefaultGrpcResolver() {
return useDefaultGrpcResolver;
}

public Tracer getTracer() {
return tracer;
}

public ManagedChannelFactory getManagedChannelFactory() {
if (channelFactoryBuilder == null) {
channelFactoryBuilder = ChannelFactoryLoader.load();
Expand Down Expand Up @@ -408,6 +415,17 @@ public GrpcTransportBuilder withScheduler(ScheduledExecutorService scheduler) {
return this;
}

/**
* Configures tracing implementation used by higher-level SDK operations.
*
* @param tracer tracing facade implementation
* @return this builder instance
*/
public GrpcTransportBuilder withTracer(Tracer tracer) {
this.tracer = Objects.requireNonNull(tracer, "tracer is null");
return this;
}

/**
* use {@link GrpcTransportBuilder#withGrpcRetry(boolean) } instead
* @return this
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,15 @@ private Metadata makeMetadataFromSettings(GrpcRequestSettings settings) {
if (settings.getClientCapabilities() != null) {
settings.getClientCapabilities().forEach(name -> metadata.put(YdbHeaders.YDB_CLIENT_CAPABILITIES, name));
}
if (settings.getSpan() != null) {
settings.getSpan().injectHeaders((key, value) -> {
if (key == null || value == null || key.isEmpty() || value.isEmpty()) {
return;
}
Metadata.Key<String> header = Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER);
metadata.put(header, value);
});
}
return metadata;
}

Expand Down
25 changes: 25 additions & 0 deletions core/src/main/java/tech/ydb/core/impl/YdbTransportImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import tech.ydb.core.impl.pool.GrpcChannel;
import tech.ydb.core.impl.pool.GrpcChannelPool;
import tech.ydb.core.impl.pool.ManagedChannelFactory;
import tech.ydb.core.tracing.Tracer;

/**
* @author Nikolay Perfilov
Expand All @@ -34,20 +35,24 @@ public class YdbTransportImpl extends BaseGrpcTransport {
private static final Logger logger = LoggerFactory.getLogger(YdbTransportImpl.class);

private final String database;
private final String transportScheme;
private final EndpointRecord discoveryEndpoint;
private final ScheduledExecutorService scheduler;
private final ManagedChannelFactory channelFactory;
private final AuthCallOptions callOptions;
private final EndpointPool endpointPool;
private final GrpcChannelPool channelPool;
private final YdbDiscovery discovery;
private final Tracer tracer;

public YdbTransportImpl(GrpcTransportBuilder builder) {
BalancingSettings balancingSettings = getBalancingSettings(builder);
Duration discoveryTimeout = Duration.ofMillis(builder.getDiscoveryTimeoutMillis());

this.database = Strings.nullToEmpty(builder.getDatabase());
this.discoveryEndpoint = getDiscoveryEndpoint(builder);
this.transportScheme = builder.getUseTls() ? "grpcs" : "grpc";
this.tracer = builder.getTracer();

logger.info("Create YDB transport with endpoint {} and {}", discoveryEndpoint, balancingSettings);

Expand Down Expand Up @@ -158,6 +163,26 @@ public String getDatabase() {
return database;
}

@Override
public Tracer getTracer() {
return tracer;
}

@Override
public String getServerAddress() {
return discoveryEndpoint.getHost();
}

@Override
public int getServerPort() {
return discoveryEndpoint.getPort();
}

@Override
public String getTransportScheme() {
return transportScheme;
}

@Override
public AuthCallOptions getAuthCallOptions() {
return callOptions;
Expand Down
140 changes: 140 additions & 0 deletions core/src/main/java/tech/ydb/core/tracing/DbSpanFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package tech.ydb.core.tracing;

import java.util.Locale;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.Nullable;

import tech.ydb.core.Issue;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.grpc.GrpcTransport;

/**
* Common helper to build DB operation spans with YDB attributes.
*/
public final class DbSpanFactory {
private static final int MAX_ISSUES_LENGTH = 512;

private final Tracer tracer;
private final String dbNamespace;
private final String serverAddress;
private final int serverPort;
private final String scheme;

public DbSpanFactory(GrpcTransport transport) {
this.tracer = transport.getTracer();
this.dbNamespace = transport.getDatabase();
this.serverAddress = transport.getServerAddress();
this.serverPort = transport.getServerPort();
this.scheme = transport.getTransportScheme();
}

public OperationSpan startClientSpan(String operationName, @Nullable String traceId, @Nullable String sessionId) {
SpanBuilder spanBuilder = tracer
.spanBuilder(operationName)
.setAttribute("db.system.name", "ydb")
.setAttribute("db.namespace", dbNamespace)
.setAttribute("server.address", serverAddress)
.setAttribute("server.port", serverPort)
.setAttribute("url.scheme", scheme);

if (traceId != null && !traceId.isEmpty()) {
spanBuilder.setAttribute("ydb.trace_id", traceId);
}
if (sessionId != null && !sessionId.isEmpty()) {
spanBuilder.setAttribute("ydb.session_id", sessionId);
}

return new OperationSpan(spanBuilder.startSpan());
}

public static final class OperationSpan {
private final Span span;
private final AtomicBoolean finished = new AtomicBoolean(false);

OperationSpan(Span span) {
this.span = span;
}

public Span getSpan() {
return span;
}

public OperationSpan setAttribute(String key, String value) {
if (value != null && !value.isEmpty()) {
span.setAttribute(key, value);
}
return this;
}

public void finishStatus(Status status) {
if (status == null) {
finishError(new IllegalStateException("Operation finished without status"));
return;
}

if (!status.isSuccess()) {
StatusCode code = status.getCode();
span.setAttribute("db.response.status_code", code.getCode());
span.setAttribute("error.type", toErrorType(code));
span.setAttribute("ydb.status_code", code.getCode());
span.setAttribute("ydb.status_name", code.name());
span.setAttribute("ydb.issues_count", status.getIssues().length);
String issuesText = formatIssues(status.getIssues());
if (issuesText != null) {
span.setAttribute("ydb.issues", issuesText);
}
}

endOnce();
}

public void finishError(Throwable error) {
if (error == null) {
finishStatus(Status.of(StatusCode.CLIENT_INTERNAL_ERROR));
return;
}

span.recordException(error).setAttribute("error.type", normalizeThrowable(error));
endOnce();
}

private void endOnce() {
if (finished.compareAndSet(false, true)) {
span.end();
}
}

private static String toErrorType(StatusCode code) {
return "ydb." + code.name().toLowerCase(Locale.ROOT);
}

private static String normalizeThrowable(Throwable error) {
String className = error.getClass().getSimpleName();
if (className == null || className.isEmpty()) {
return "java.throwable";
}
return "java." + className.toLowerCase(Locale.ROOT);
}

@Nullable
private static String formatIssues(Issue[] issues) {
if (issues == null || issues.length == 0) {
return null;
}

StringBuilder sb = new StringBuilder();
for (Issue issue : issues) {
if (sb.length() > 0) {
sb.append("; ");
}
sb.append(issue);
if (sb.length() > MAX_ISSUES_LENGTH) {
return sb.substring(0, MAX_ISSUES_LENGTH);
}
}
return sb.toString();
}
}
}
Loading