Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,7 @@ class BeamModulePlugin implements Plugin<Project> {
log4j2_log4j12_api : "org.apache.logging.log4j:log4j-1.2-api:$log4j2_version",
mockito_core : "org.mockito:mockito-core:4.11.0",
mockito_inline : "org.mockito:mockito-inline:4.11.0",
mockito_junit_jupiter : "org.mockito:mockito-junit-jupiter:4.11.0",
mongo_java_driver : "org.mongodb:mongodb-driver-sync:5.5.0",
mongo_bson : "org.mongodb:bson:5.5.0",
mongodb_driver_core : "org.mongodb:mongodb-driver-core:5.5.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ dependencies {
testImplementation library.java.junit
testImplementation library.java.mockito_core
testImplementation library.java.mockito_inline
testImplementation library.java.mockito_junit_jupiter
testImplementation library.java.jupiter_api
testImplementation library.java.jupiter_params
testRuntimeOnly library.java.jupiter_engine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ enum ConfigurableOption {
* <li>{@code metrics} - Enables authentication for metric exports.
* <li>{@code traces} - Enables authentication for trace exports.
* <li>{@code all} - Enables authentication for all exports.
* <li>{@code none} - Disables authentication for all exports.
* </ul>
*
* <p>The values are case-sensitive. Whitespace around commas and values is ignored. Can be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,20 @@
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static java.util.Arrays.stream;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;

import com.google.auth.oauth2.GoogleCredentials;
import com.google.auto.service.AutoService;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter;
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporterBuilder;
import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter;
import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporterBuilder;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder;
import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizer;
import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizerProvider;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
Expand All @@ -41,8 +46,9 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.opentelemetry.gcp.auth.GoogleAuthException.Reason;

Expand All @@ -62,25 +68,34 @@
* @see AutoConfigurationCustomizerProvider
* @see GoogleCredentials
*/
@AutoService(AutoConfigurationCustomizerProvider.class)
@Internal
@AutoService(AutoConfigurationCustomizerProvider.class)
public class GcpAuthAutoConfigurationCustomizerProvider
implements AutoConfigurationCustomizerProvider {

private static final Logger LOG =
Logger.getLogger(GcpAuthAutoConfigurationCustomizerProvider.class.getName());
private static final String SIGNAL_TARGET_WARNING_FIX_SUGGESTION =
String.format(
"You may safely ignore this warning if it is intentional, otherwise please configure the '%s' by exporting valid values to environment variable: %s or by setting valid values in system property: %s.",
ConfigurableOption.GOOGLE_OTEL_AUTH_TARGET_SIGNALS.getUserReadableName(),
ConfigurableOption.GOOGLE_OTEL_AUTH_TARGET_SIGNALS.getEnvironmentVariable(),
ConfigurableOption.GOOGLE_OTEL_AUTH_TARGET_SIGNALS.getSystemProperty());

static final String QUOTA_USER_PROJECT_HEADER = "x-goog-user-project";
static final String GCP_USER_PROJECT_ID_KEY = "gcp.project_id";

static final String SIGNAL_TYPE_TRACES = "traces";
static final String SIGNAL_TYPE_METRICS = "metrics";
static final String SIGNAL_TYPE_ALL = "all";

private @Nullable GoogleCredentials credentials;
static final String SIGNAL_TYPE_NONE = "none";

/**
* Customizes the provided {@link AutoConfigurationCustomizer} such that authenticated exports to
* GCP Telemetry API are possible from the configured OTLP exporter.
*
* <p>This method performs the following:
* <p>This method attempts to retrieve Google Application Default Credentials (ADC) and performs
* the following:
*
* <ul>
* <li>Verifies whether the configured OTLP endpoint (base or signal specific) is a known GCP
Expand All @@ -99,147 +114,139 @@ public class GcpAuthAutoConfigurationCustomizerProvider
*/
@Override
public void customize(@Nonnull AutoConfigurationCustomizer autoConfiguration) {
java.util.function.Supplier<GoogleCredentials> credentialsSupplier =
new java.util.function.Supplier<GoogleCredentials>() {
private @javax.annotation.Nullable GoogleCredentials credentials;

@Override
public synchronized GoogleCredentials get() {
if (credentials == null) {
try {
credentials = GoogleCredentials.getApplicationDefault();
} catch (IOException e) {
throw new GoogleAuthException(Reason.FAILED_ADC_RETRIEVAL, e);
}
}
return credentials;
}
};
autoConfiguration
.addSpanExporterCustomizer(this::customizeSpanExporter)
.addMetricExporterCustomizer(this::customizeMetricExporter)
.addResourceCustomizer(this::customizeResource);
.addSpanExporterCustomizer(
(spanExporter, configProperties) ->
customizeSpanExporter(spanExporter, credentialsSupplier, configProperties))
.addMetricExporterCustomizer(
(metricExporter, configProperties) ->
customizeMetricExporter(metricExporter, credentialsSupplier, configProperties))
.addResourceCustomizer(
(resource, configProperties) ->
customizeResource(resource, credentialsSupplier, configProperties));
}
Comment thread
stankiewicz marked this conversation as resolved.

@Override
public int order() {
return Integer.MAX_VALUE - 1;
}

private synchronized GoogleCredentials getCredentials() {
if (credentials == null) {
try {
credentials = GoogleCredentials.getApplicationDefault();
} catch (IOException e) {
throw new GoogleAuthException(Reason.FAILED_ADC_RETRIEVAL, e);
}
}
return credentials;
}

private SpanExporter customizeSpanExporter(
SpanExporter exporter, ConfigProperties configProperties) {
private static SpanExporter customizeSpanExporter(
SpanExporter exporter,
java.util.function.Supplier<GoogleCredentials> credentialsSupplier,
ConfigProperties configProperties) {
if (isSignalTargeted(SIGNAL_TYPE_TRACES, configProperties)) {
return addAuthorizationHeaders(exporter, configProperties);
return addAuthorizationHeaders(exporter, credentialsSupplier.get(), configProperties);
} else {
String[] params = {
SIGNAL_TYPE_TRACES, SIGNAL_TYPE_NONE, SIGNAL_TARGET_WARNING_FIX_SUGGESTION
};
LOG.log(
Level.WARNING,
"GCP Authentication Extension is not configured for signal type: {0} or is configured with signal type: {1}. {2}",
params);
}
return exporter;
}
Comment thread
stankiewicz marked this conversation as resolved.

private MetricExporter customizeMetricExporter(
MetricExporter exporter, ConfigProperties configProperties) {
private static MetricExporter customizeMetricExporter(
MetricExporter exporter,
java.util.function.Supplier<GoogleCredentials> credentialsSupplier,
ConfigProperties configProperties) {
if (isSignalTargeted(SIGNAL_TYPE_METRICS, configProperties)) {
return addAuthorizationHeaders(exporter, configProperties);
return addAuthorizationHeaders(exporter, credentialsSupplier.get(), configProperties);
} else {
String[] params = {
SIGNAL_TYPE_METRICS, SIGNAL_TYPE_NONE, SIGNAL_TARGET_WARNING_FIX_SUGGESTION
};
LOG.log(
Level.WARNING,
"GCP Authentication Extension is not configured for signal type: {0} or is configured with signal type: {1}. {2}",
params);
}
return exporter;
}
Comment thread
stankiewicz marked this conversation as resolved.

// Checks if the auth extension is configured to target the passed signal for authentication.
private static boolean isSignalTargeted(String checkSignal, ConfigProperties configProperties) {
String endpoint = configProperties.getString("otel.exporter.otlp." + checkSignal + ".endpoint");
if (endpoint == null) {
endpoint = configProperties.getString("otel.exporter.otlp.endpoint");
}
if (endpoint == null) {
return false;
}

try {
java.net.URI uri = new java.net.URI(endpoint);
String host = uri.getHost();
String scheme = uri.getScheme();
if (host == null
|| scheme == null
|| !scheme.equalsIgnoreCase("https")
|| (!host.equalsIgnoreCase("telemetry.googleapis.com")
&& !host.equalsIgnoreCase("telemetry.mtls.googleapis.com"))) {
return false;
}
} catch (java.net.URISyntaxException e) {
return false;
}

String userSpecifiedTargetedSignals =
ConfigurableOption.GOOGLE_OTEL_AUTH_TARGET_SIGNALS.getConfiguredValueWithFallback(
configProperties, () -> SIGNAL_TYPE_ALL);
return stream(userSpecifiedTargetedSignals.split(","))
.map(String::trim)
.anyMatch(
targetedSignal ->
targetedSignal.equals(checkSignal) || targetedSignal.equals(SIGNAL_TYPE_ALL));
}

private boolean isAnySignalTargeted(ConfigProperties configProperties) {
return isSignalTargeted(SIGNAL_TYPE_TRACES, configProperties)
|| isSignalTargeted(SIGNAL_TYPE_METRICS, configProperties);
List<String> targetedSignals =
stream(userSpecifiedTargetedSignals.split(",")).map(String::trim).collect(toList());
if (targetedSignals.contains(SIGNAL_TYPE_NONE)) {
return false;
}
return targetedSignals.contains(checkSignal) || targetedSignals.contains(SIGNAL_TYPE_ALL);
}

// Adds authorization headers to the calls made by the OtlpGrpcSpanExporter and
// OtlpHttpSpanExporter.
private SpanExporter addAuthorizationHeaders(
SpanExporter exporter, ConfigProperties configProperties) {
private static SpanExporter addAuthorizationHeaders(
SpanExporter exporter, GoogleCredentials credentials, ConfigProperties configProperties) {
if (exporter instanceof OtlpHttpSpanExporter) {
SpanExporter result =
OtlpHttpSpanExporterBuilder builder =
((OtlpHttpSpanExporter) exporter)
.toBuilder()
.setHeaders(() -> getRequiredHeaderMap(configProperties))
.build();
exporter.shutdown();
return result;
.setHeaders(() -> getRequiredHeaderMap(credentials, configProperties));
return builder.build();
} else if (exporter instanceof OtlpGrpcSpanExporter) {
SpanExporter result =
OtlpGrpcSpanExporterBuilder builder =
((OtlpGrpcSpanExporter) exporter)
.toBuilder()
.setHeaders(() -> getRequiredHeaderMap(configProperties))
.build();
exporter.shutdown();
return result;
.setHeaders(() -> getRequiredHeaderMap(credentials, configProperties));
return builder.build();
}
return exporter;
}

// Adds authorization headers to the calls made by the OtlpGrpcMetricExporter and
// OtlpHttpMetricExporter.
private MetricExporter addAuthorizationHeaders(
MetricExporter exporter, ConfigProperties configProperties) {
private static MetricExporter addAuthorizationHeaders(
MetricExporter exporter, GoogleCredentials credentials, ConfigProperties configProperties) {
if (exporter instanceof OtlpHttpMetricExporter) {
MetricExporter result =
OtlpHttpMetricExporterBuilder builder =
((OtlpHttpMetricExporter) exporter)
.toBuilder()
.setHeaders(() -> getRequiredHeaderMap(configProperties))
.build();
exporter.shutdown();
return result;
.setHeaders(() -> getRequiredHeaderMap(credentials, configProperties));
return builder.build();
} else if (exporter instanceof OtlpGrpcMetricExporter) {
MetricExporter result =
OtlpGrpcMetricExporterBuilder builder =
((OtlpGrpcMetricExporter) exporter)
.toBuilder()
.setHeaders(() -> getRequiredHeaderMap(configProperties))
.build();
exporter.shutdown();
return result;
.setHeaders(() -> getRequiredHeaderMap(credentials, configProperties));
return builder.build();
}
return exporter;
}

private Map<String, String> getRequiredHeaderMap(ConfigProperties configProperties) {
GoogleCredentials creds = getCredentials();
private static Map<String, String> getRequiredHeaderMap(
GoogleCredentials credentials, ConfigProperties configProperties) {
Map<String, List<String>> gcpHeaders;
try {
// this also refreshes the credentials, if required
gcpHeaders = creds.getRequestMetadata();
gcpHeaders = credentials.getRequestMetadata();
} catch (IOException e) {
throw new GoogleAuthException(Reason.FAILED_ADC_REFRESH, e);
}
if (gcpHeaders == null) {
return Map.of();
}
Map<String, String> flattenedHeaders =
gcpHeaders.entrySet().stream()
.filter(entry -> entry.getKey() != null && entry.getValue() != null)
.collect(
toMap(
Map.Entry::getKey,
Expand All @@ -262,16 +269,19 @@ private Map<String, String> getRequiredHeaderMap(ConfigProperties configProperti
}

// Updates the current resource with the attributes required for ingesting OTLP data on GCP.
private Resource customizeResource(Resource resource, ConfigProperties configProperties) {
if (!isAnySignalTargeted(configProperties)) {
private static Resource customizeResource(
Resource resource,
java.util.function.Supplier<GoogleCredentials> credentialsSupplier,
ConfigProperties configProperties) {
if (!isSignalTargeted(SIGNAL_TYPE_TRACES, configProperties)
&& !isSignalTargeted(SIGNAL_TYPE_METRICS, configProperties)) {
return resource;
}

String gcpProjectId;
try {
gcpProjectId = ConfigurableOption.GOOGLE_CLOUD_PROJECT.getConfiguredValue(configProperties);
} catch (ConfigurationException e) {
gcpProjectId = getCredentials().getProjectId();
gcpProjectId = credentialsSupplier.get().getProjectId();
if (gcpProjectId == null || gcpProjectId.isEmpty()) {
throw e;
}
Comment thread
stankiewicz marked this conversation as resolved.
Expand Down
Loading
Loading