Pipeline
Pipelines are used by the Transformation Engine to orchestrate Sources, Transformers, Sinks, and Datasets.
Overview
A Pipeline is a directed graph that describes how Sources, Transformers, Sinks, and Datasets should be connected together in the runtime environment. Creating a Pipeline involves configuring each Transformer according to the Transformer Template that it was registered with. When a Pipeline is submitted, runtime validation is performed to ensure the graph makes sense; validation can fail for several reasons:
-
The configuration for a Transformer is incomplete according to its Transformer Template.
-
An input or output connection on a Transformer is of the wrong type (for example, connecting an
INTERNAL_KAFKADataset to an input of typeINTERNAL_ICEBERG). -
An underlying storage medium in [R]DP is unavailable to be used by the Transformation Engine.
Data Model
Like Transformer Templates, Pipelines are represented as JSON objects that document the desired Transformers, Datasets, and connections. To dig deeper, take the example Pipeline shown earlier:
The equivalent JSON object to represent this Pipeline looks like:
{
"uid": "6d769fce-fa56-402c-bc36-36ffd69b7b3b",
"name": "Developer Demo",
"description": "",
"state": "inactive",
"priority": "LOW",
"security_markings": "UNCLASSIFIED",
"mock": false,
"created_by": "urn:rdp:keycloak-user:admin",
"annotations": [
{
"key": "rdp.catalog.datasource",
"value": "fe02644a-2c10-4e7e-a9b0-3eaad32ccea7"
},
{
"key": "rdp.catalog.enablement",
"value": "fd36293f-42bd-4d55-8a2b-eb266987acc7"
},
{
"key": "rdp.catalog.dataset",
"value": "18120485-b5d6-4964-9337-309613476e0f"
}
],
"transformers": [
{
"uid": "6be9e029-10d2-4eb5-8132-c4dece414bc2",
"display_name": "TAK Consumer",
"description": "Consumes Cursor on Target (CoT) XML messages from TAK.",
"configuration": {
"environment": {
"CLASSIFICATION_LEVEL": "UNCLASSIFIED",
"LOG_LEVEL": "info",
"SECURITY_MARKING": "add",
"SHUTDOWN_TIMEOUT": "30s",
"TAK_CONNECT_TIMEOUT_MS": "10000",
"TAK_ENDPOINT": "some-tak-server.com:8089",
"TAK_INSECURE_SKIP_VERIFY": "false",
"TAK_PROTOCOL": "auto",
"TAK_READ_TIMEOUT_MS": "0",
"TAK_RECONNECT_BACKOFF_MS": "1000",
"TAK_RECONNECT_MAX_BACKOFF_MS": "30000",
"TAK_RECONNECT_MAX_RETRIES": "5",
"TAK_TLS_ENABLED": "true",
"TAK_WRITE_TIMEOUT_MS": "5000"
},
"replicas": 1
},
"position": {
"x": -948,
"y": 458.6666666666666
}
},
{
"uid": "3e9efa01-4953-49c7-a08a-4fa64443bcfc",
"display_name": "CoT XML to WDM Protobuf",
"description": "Converts Cursor-on-Target (CoT) XML messages to Raft Warfighting Data Model (WDM) protobuf.",
"configuration": {
"environment": {
"CLASSIFICATION_LEVEL": "UNCLASSIFIED",
"LOG_LEVEL": "info",
"SECURITY_MARKING": "add",
"WDM_SIMULATED": "false"
},
"replicas": 1
},
"position": {
"x": -144.05069308747835,
"y": 432.2345697965192
}
},
{
"uid": "f2a1a99a-b2c1-4a81-a1cc-de4fdb449470",
"display_name": "Publish to WDM",
"description": "Publishes entities/tasks to [R]DP's WDM API.",
"configuration": {
"environment": {
"CLASSIFICATION_LEVEL": "UNCLASSIFIED",
"CONCURRENT_BATCH_WRITES": "false",
"CONCURRENT_BATCH_WRITES_MAX_CONCURRENCY": "10",
"INPUT_FORMAT": "wdmproto",
"LOG_LEVEL": "info",
"MAPPING_FILE_PATH": "/config/mapping.json",
"PARALLEL_WRITE_MAX_CONCURRENCY": "16",
"PARTITIONED_WRITE_NUM_PARTITIONS": "8",
"SECURITY_MARKING": "add",
"TRANSFORM_MODE": "default",
"WDM_AUTH_SERVER_URL": "http://df-api-gateway/api/v1/auth/token",
"WDM_MAX_RETRIES": "3",
"WDM_PROVENANCE_NAME": "Raft Data Platform",
"WDM_PROVENANCE_SOURCE_ID": "Raft Data Platform",
"WDM_SERVER_URL": "http://df-core",
"WDM_TIMEOUT_MS": "30000",
"WRITE_CONCURRENCY_MODE": "partitioned"
},
"replicas": 1,
"file_configuration": {
"mapping": {
"base64_encoded_text": ""
}
}
},
"position": {
"x": 600.3135164612133,
"y": 441.7549504225129
},
}
],
"datasets": [
{
"uid": "07612660-e124-4026-b15b-01d147f44dee",
"display_name": "Ingested TAK Messages",
"description": "Each message represents a single TAK message ingested from the remote source.",
"configuration": {
"ref": "tak.landing",
"conn_type": "INTERNAL_KAFKA"
},
"position": {
"x": -540.3269088596635,
"y": 440.0018360782116
},
"catalog": {
"datasource_id": "fe02644a-2c10-4e7e-a9b0-3eaad32ccea7",
"enablement_id": "fd36293f-42bd-4d55-8a2b-eb266987acc7",
"dataset_id": "18120485-b5d6-4964-9337-309613476e0f"
}
},
{
"uid": "74fec918-f4bc-4ae8-87b1-5ba2079603ca",
"display_name": "CoT WDM Messages",
"description": "TAK messages transformed from CoT XML into WDM (Protobuf)",
"configuration": {
"ref": "tak.landing",
"conn_type": "INTERNAL_KAFKA"
},
"position": {
"x": 253.37383265007168,
"y": 435.7351153046987
},
"catalog": {
"datasource_id": "fe02644a-2c10-4e7e-a9b0-3eaad32ccea7",
"enablement_id": "fd36293f-42bd-4d55-8a2b-eb266987acc7",
"dataset_id": "18120485-b5d6-4964-9337-309613476e0f"
}
}
],
"connections": [
{
"from": {
"id": "6be9e029-10d2-4eb5-8132-c4dece414bc2",
"type": "Transformer",
"conn": "KAFKA_OUT_TOPIC"
},
"to": {
"id": "07612660-e124-4026-b15b-01d147f44dee",
"type": "Dataset",
"conn": "input"
}
},
{
"from": {
"id": "07612660-e124-4026-b15b-01d147f44dee",
"type": "Dataset",
"conn": ""
},
"to": {
"id": "3e9efa01-4953-49c7-a08a-4fa64443bcfc",
"type": "Transformer",
"conn": "KAFKA_IN_TOPIC"
}
},
{
"from": {
"id": "3e9efa01-4953-49c7-a08a-4fa64443bcfc",
"type": "Transformer",
"conn": "KAFKA_OUT_TOPIC"
},
"to": {
"id": "74fec918-f4bc-4ae8-87b1-5ba2079603ca",
"type": "Dataset",
"conn": ""
}
},
{
"from": {
"id": "74fec918-f4bc-4ae8-87b1-5ba2079603ca",
"type": "Dataset",
"conn": ""
},
"to": {
"id": "f2a1a99a-b2c1-4a81-a1cc-de4fdb449470",
"type": "Transformer",
"conn": "KAFKA_IN_TOPIC"
}
}
]
}