Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ dependencies {
implementation(libs.edc.iam.mock)
implementation(libs.edc.data.plane.spi)
implementation(libs.opentelemetry.exporter.otlp)
runtimeOnly(libs.edc.monitor.jdk.logger)
}

application {
Expand Down
14 changes: 8 additions & 6 deletions federated-catalog/fc-01-embedded/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ dependencies {
The [config.properties](../fc-01-embedded/fc-connector/config.properties)
file contains the necessary configurations
for this `fc-connector`, including the standard settings for a regular connector, along with additional configurations for a
federated catalog, such as catalog api endpoint and crawler execution interval.
federated catalog, such as the management API path and crawler execution interval.

```properties
web.http.catalog.path=/api/catalog
web.http.catalog.port=29195
web.http.management.port=29193
web.http.management.path=/api/management

edc.catalog.cache.execution.delay.seconds=5
edc.catalog.cache.execution.period.seconds=5
Expand All @@ -62,7 +62,7 @@ To run the connector, execute the following command
java -Dedc.fs.config=federated-catalog/fc-01-embedded/fc-connector/config.properties -jar federated-catalog/fc-01-embedded/fc-connector/build/libs/fc-connector.jar
```

If the execution is successful, then the Catalog API of our `fc-connector` will listen on port `29195`.
If the execution is successful, then the Management API of our `fc-connector` will listen on port `29193`.

If you observe the logs, you can see the following recurring lines,

Expand All @@ -83,12 +83,14 @@ This means our FC crawler is running, and the crawler found one node, which is t

### 3. Test catalog query API

To query the catalogs from `fc-connector` side, we can now call the catalog API of our embedded federated catalog.
To query the catalogs from `fc-connector` side, we can now call the catalog API through the management API of our embedded federated catalog.
Use the following request to invoke the catalog API:

```http request
curl -d @federated-catalog/fc-01-embedded/resources/empty-query.json \
-H 'content-type: application/json' http://localhost:29195/api/catalog/v1alpha/catalog/query \
-H 'content-type: application/json' \
-H 'x-api-key: password' \
http://localhost:29193/api/management/v3/catalogs/request \
-s | jq
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ edc.dsp.callback.address=http://localhost:19194/protocol
web.http.port=29191
web.http.path=/api
web.http.management.port=29193
web.http.management.path=/management
web.http.management.path=/api/management
web.http.protocol.port=29194
web.http.protocol.path=/protocol
edc.transfer.proxy.token.signer.privatekey.alias=private-key
Expand All @@ -14,14 +14,9 @@ web.http.control.port=29192
web.http.control.path=/control
edc.dataplane.api.public.baseurl=http://localhost:29291/public


web.http.management.auth.key=password
edc.mock.region=us


web.http.catalog.path=/api/catalog
web.http.catalog.port=29195

edc.catalog.cache.execution.delay.seconds=5
edc.catalog.cache.execution.period.seconds=5
edc.catalog.cache.partition.num.crawlers=1
Expand Down
11 changes: 6 additions & 5 deletions federated-catalog/fc-02-standalone/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ a single target catalog node; the `participant-connector` that we had set up in
Querying the catalog API will therefore yield just one catalog, which is the contract offered by this connector.

The [config.properties](./standalone-fc/config.properties) file contains the necessary configurations,
like the `web.http.catalog.path`, which is the catalog API endpoint of this standalone FC.
like the `web.http.management.path`, which is the management API endpoint of this standalone FC.

```properties
web.http.catalog.path=/api/catalog
web.http.catalog.port=39195
web.http.management.path=/api/management
web.http.management.port=39193
```

### 2. Start the fc-connector
Expand All @@ -57,7 +57,7 @@ To run the federated catalog, execute the following command
java -Dedc.fs.config=federated-catalog/fc-02-standalone/standalone-fc/config.properties -jar federated-catalog/fc-02-standalone/standalone-fc/build/libs/standalone-fc.jar
```

If the execution is successful, then the Catalog API of our standalone FC will listen on port `39195`.
If the execution is successful, then the Management API of our standalone FC will listen on port `39193`.



Expand All @@ -69,7 +69,8 @@ To get the combined set of catalogs, use the following request:

```http request
curl -d @federated-catalog/fc-01-embedded/resources/empty-query.json \
-H 'content-type: application/json' http://localhost:39195/api/catalog/v1alpha/catalog/query \
-H 'content-type: application/json' \
http://localhost:39193/api/management/v3/catalogs/request \
-s | jq
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ edc.participant.id=consumer

web.http.port=39191
web.http.path=/api

web.http.catalog.path=/api/catalog
web.http.catalog.port=39195
web.http.management.path=/api/management
web.http.management.port=39193

edc.catalog.cache.execution.delay.seconds=5
edc.catalog.cache.execution.period.seconds=5
Expand Down
11 changes: 7 additions & 4 deletions federated-catalog/fc-03-static-node-directory/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,16 @@ To run the federated catalog, execute the following command
java -Dedc.fs.config=federated-catalog/fc-02-standalone/standalone-fc/config.properties -jar federated-catalog/fc-03-static-node-directory/standalone-fc-with-node-resolver/build/libs/standalone-fc-with-node-resolver.jar
```

If the execution is successful, then the Catalog API of our standalone FC will listen on port `39195`.
If the execution is successful, then the Management API of our standalone FC will listen on port `39193`.

#### Test catalog query API

To get the combined set of catalogs, use the following request:

```http request
curl -d @federated-catalog/fc-01-embedded/resources/empty-query.json \
-H 'content-type: application/json' http://localhost:39195/api/catalog/v1alpha/catalog/query \
-H 'content-type: application/json' \
http://localhost:39193/api/management/v3/catalogs/request \
-s | jq
```

Expand All @@ -126,14 +127,16 @@ To run the federated catalog, execute the following command
java -Dedc.fs.config=federated-catalog/fc-01-embedded/fc-connector/config.properties -jar federated-catalog/fc-03-static-node-directory/embedded-fc-with-node-resolver/build/libs/fc-connector-with-node-resolver.jar
```

If the execution is successful, then the Catalog API of our standalone FC will listen on port `29195`.
If the execution is successful, then the Management API of our embedded FC connector will listen on port `29193`.

#### Test catalog query API

To get the combined set of catalogs, use the following request:

```http request
curl -d @federated-catalog/fc-01-embedded/resources/empty-query.json \
-H 'content-type: application/json' http://localhost:29195/api/catalog/v1alpha/catalog/query \
-H 'content-type: application/json' \
-H 'x-api-key: password' \
http://localhost:29193/api/management/v3/catalogs/request \
-s | jq
```
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ dependencies {
implementation(libs.edc.configuration.filesystem)
implementation(libs.edc.management.api)
implementation(libs.edc.dsp)
implementation(libs.edc.participant.context.single.core)
implementation(libs.edc.iam.mock)
implementation(libs.edc.http)
implementation(libs.edc.edr.store.core)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ dependencies {
runtimeOnly(libs.edc.token.core)
implementation(libs.edc.http)
runtimeOnly(libs.edc.dsp)
runtimeOnly(libs.edc.participant.context.single.core)
implementation(libs.edc.iam.mock)

}
Expand Down
4 changes: 1 addition & 3 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ format.version = "1.1"

[versions]
awaitility = "4.2.2"
edc = "0.16.0"
edc = "0.17.0"
okhttp-mockwebserver = "5.3.2"
restAssured = "6.0.0"
rsApi = "4.0.0"
Expand Down Expand Up @@ -42,9 +42,7 @@ edc-json-ld-lib = { module = "org.eclipse.edc:json-ld-lib", version.ref = "edc"
edc-json-ld-spi = { module = "org.eclipse.edc:json-ld-spi", version.ref = "edc" }
edc-junit = { module = "org.eclipse.edc:junit", version.ref = "edc" }
edc-management-api = { module = "org.eclipse.edc:management-api", version.ref = "edc" }
edc-participant-context-single-core = { module = "org.eclipse.edc:participant-context-single-core", version.ref = "edc" }
edc-management-api-test-fixtures = { module = "org.eclipse.edc:management-api-test-fixtures", version.ref = "edc" }
edc-monitor-jdk-logger = { module = "org.eclipse.edc:monitor-jdk-logger", version.ref = "edc" }
edc-runtime-core = { module = "org.eclipse.edc:runtime-core", version.ref = "edc" }
edc-token-core = { module = "org.eclipse.edc:token-core", version.ref = "edc" }
edc-transfer-data-plane-signaling = { module = "org.eclipse.edc:transfer-data-plane-signaling", version.ref = "edc" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public class FederatedCatalogCommon {
public static final int CRAWLER_EXECUTION_PERIOD_VALUE = 5;
public static final int TIMEOUT = 5 * CRAWLER_EXECUTION_PERIOD_VALUE;

public static final String EMBEDDED_FC_CATALOG_API_ENDPOINT = "http://localhost:29195/api/catalog/v1alpha/catalog/query";
public static final String STANDALONE_FC_CATALOG_API_ENDPOINT = "http://localhost:39195/api/catalog/v1alpha/catalog/query";
public static final String EMBEDDED_FC_CATALOG_API_ENDPOINT = "http://localhost:29193/api/management/v3/catalogs/request";
public static final String STANDALONE_FC_CATALOG_API_ENDPOINT = "http://localhost:39193/api/management/v3/catalogs/request";
public static final String EMPTY_QUERY_FILE_PATH = "federated-catalog/fc-01-embedded/resources/empty-query.json";
public static final String TYPE = "[0].@type";
public static final String DATASET_ASSET_ID = "[0].dataset[0].@id";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.eclipse.edc.junit.annotations.EndToEndTest;
import org.eclipse.edc.junit.extensions.RuntimeExtension;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

Expand All @@ -40,9 +41,11 @@
public class FederatedCatalog01embeddedTest {

@RegisterExtension
@Order(0)
static final RuntimeExtension PARTICIPANT_CONNECTOR = getProvider();

@RegisterExtension
@Order(1)
static final RuntimeExtension FC_CONNECTOR = getFcEmbeddedConnector(":federated-catalog:fc-01-embedded:fc-connector");

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public String createPolicyDefinition(String requestBody) {
.when()
.post("/policydefinitions")
.then()
.log().ifValidationFails()
.statusCode(200)
.contentType(JSON)
.extract().jsonPath().getString(ID);
Expand All @@ -66,6 +67,7 @@ public String createContractDefinition(String requestBody) {
.when()
.post("/contractdefinitions")
.then()
.log().ifValidationFails()
.statusCode(200)
.extract().jsonPath().getString(ID);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ plugins {
dependencies {
runtimeOnly(libs.edc.bom.controlplane.base)
implementation(libs.edc.control.plane.api.client)
runtimeOnly(libs.edc.participant.context.single.core)
runtimeOnly(libs.edc.iam.mock)
runtimeOnly(libs.edc.transfer.data.plane.signaling)
runtimeOnly(libs.edc.validator.data.address.http.data)
Expand Down
1 change: 1 addition & 0 deletions transfer/transfer-06-kafka-broker/1-asset.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" },
"@type": "Asset",
"@id": "kafka-stream-asset",
"properties": {
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
},
"@type": "PolicyDefinition",
"@id": "no-constraint-policy",
"policy": {
"@type": "odrl:Set"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" },
"@type": "ContractDefinition",
"@id": "contract-definition",
"accessPolicyId": "no-constraint-policy",
"contractPolicyId": "no-constraint-policy",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.jetbrains.annotations.NotNull;

import java.util.Set;

import static org.eclipse.edc.dataaddress.kafka.spi.KafkaDataAddressSchema.KAFKA_TYPE;
import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE;

class KafkaToKafkaDataFlowController implements DataFlowController {

Expand All @@ -50,12 +50,12 @@ public StatusResult<DataFlowResponse> prepare(TransferProcess transferProcess, P

var contentDataAddress = transferProcess.getContentDataAddress();
var kafkaDataAddress = DataAddress.Builder.newInstance()
.type(EndpointDataReference.EDR_SIMPLE_TYPE)
.property(EndpointDataReference.ID, transferProcess.getCorrelationId())
.property(EndpointDataReference.ENDPOINT, contentDataAddress.getStringProperty("kafka.bootstrap.servers"))
.property(EndpointDataReference.AUTH_KEY, username)
.property(EndpointDataReference.AUTH_CODE, password)
.property(EndpointDataReference.CONTRACT_ID, transferProcess.getContractId())
.type("EDR")
.property(EDC_NAMESPACE + "id", transferProcess.getCorrelationId())
.property(EDC_NAMESPACE + "endpoint", contentDataAddress.getStringProperty("kafka.bootstrap.servers"))
.property(EDC_NAMESPACE + "authKey", username)
.property(EDC_NAMESPACE + "authCode", password)
.property(EDC_NAMESPACE + "contractId", transferProcess.getContractId())
.property(KafkaDataAddressSchema.TOPIC, contentDataAddress.getStringProperty(KafkaDataAddressSchema.TOPIC))
.build();

Expand Down
Loading