Pipeline

Pipelines are used by the Transformation Engine to orchestrate Sources, Transformers, Sinks, and Datasets.

Overview

A Pipeline is a directed graph that describes how Sources, Transformers, Sinks, and Datasets should be connected together in the runtime environment. Creating a Pipeline involves configuring each Transformer according to the Transformer Template that it was registered with. When a Pipeline is submitted, runtime validation is performed to ensure the graph makes sense; validation can fail for several reasons:

  • The configuration for a Transformer is incomplete according to its Transformer Template.

  • An input or output connection on a Transformer is of the wrong type (for example, connecting an INTERNAL_KAFKA Dataset to an input of type INTERNAL_ICEBERG).

  • An underlying storage medium in [R]DP is unavailable to be used by the Transformation Engine.

Data Model

Like Transformer Templates, Pipelines are represented as JSON objects that document the desired Transformers, Datasets, and connections. To dig deeper, take the example Pipeline shown earlier:

Example Pipeline

The equivalent JSON object to represent this Pipeline looks like:

{
    "uid": "6d769fce-fa56-402c-bc36-36ffd69b7b3b",
    "name": "Developer Demo",
    "description": "",
    "state": "inactive",
    "priority": "LOW",
    "security_markings": "UNCLASSIFIED",
    "mock": false,
    "created_by": "urn:rdp:keycloak-user:admin",
    "annotations": [
        {
            "key": "rdp.catalog.datasource",
            "value": "fe02644a-2c10-4e7e-a9b0-3eaad32ccea7"
        },
        {
            "key": "rdp.catalog.enablement",
            "value": "fd36293f-42bd-4d55-8a2b-eb266987acc7"
        },
        {
            "key": "rdp.catalog.dataset",
            "value": "18120485-b5d6-4964-9337-309613476e0f"
        }
    ],
    "transformers": [
        {
            "uid": "6be9e029-10d2-4eb5-8132-c4dece414bc2",
            "display_name": "TAK Consumer",
            "description": "Consumes Cursor on Target (CoT) XML messages from TAK.",
            "configuration": {
                "environment": {
                    "CLASSIFICATION_LEVEL": "UNCLASSIFIED",
                    "LOG_LEVEL": "info",
                    "SECURITY_MARKING": "add",
                    "SHUTDOWN_TIMEOUT": "30s",
                    "TAK_CONNECT_TIMEOUT_MS": "10000",
                    "TAK_ENDPOINT": "some-tak-server.com:8089",
                    "TAK_INSECURE_SKIP_VERIFY": "false",
                    "TAK_PROTOCOL": "auto",
                    "TAK_READ_TIMEOUT_MS": "0",
                    "TAK_RECONNECT_BACKOFF_MS": "1000",
                    "TAK_RECONNECT_MAX_BACKOFF_MS": "30000",
                    "TAK_RECONNECT_MAX_RETRIES": "5",
                    "TAK_TLS_ENABLED": "true",
                    "TAK_WRITE_TIMEOUT_MS": "5000"
                },
                "replicas": 1
            },
            "position": {
                "x": -948,
                "y": 458.6666666666666
            }
        },
        {
            "uid": "3e9efa01-4953-49c7-a08a-4fa64443bcfc",
            "display_name": "CoT XML to WDM Protobuf",
            "description": "Converts Cursor-on-Target (CoT) XML messages to Raft Warfighting Data Model (WDM) protobuf.",
            "configuration": {
                "environment": {
                    "CLASSIFICATION_LEVEL": "UNCLASSIFIED",
                    "LOG_LEVEL": "info",
                    "SECURITY_MARKING": "add",
                    "WDM_SIMULATED": "false"
                },
                "replicas": 1
            },
            "position": {
                "x": -144.05069308747835,
                "y": 432.2345697965192
            }
        },
        {
            "uid": "f2a1a99a-b2c1-4a81-a1cc-de4fdb449470",
            "display_name": "Publish to WDM",
            "description": "Publishes entities/tasks to [R]DP's WDM API.",
            "configuration": {
                "environment": {
                    "CLASSIFICATION_LEVEL": "UNCLASSIFIED",
                    "CONCURRENT_BATCH_WRITES": "false",
                    "CONCURRENT_BATCH_WRITES_MAX_CONCURRENCY": "10",
                    "INPUT_FORMAT": "wdmproto",
                    "LOG_LEVEL": "info",
                    "MAPPING_FILE_PATH": "/config/mapping.json",
                    "PARALLEL_WRITE_MAX_CONCURRENCY": "16",
                    "PARTITIONED_WRITE_NUM_PARTITIONS": "8",
                    "SECURITY_MARKING": "add",
                    "TRANSFORM_MODE": "default",
                    "WDM_AUTH_SERVER_URL": "http://df-api-gateway/api/v1/auth/token",
                    "WDM_MAX_RETRIES": "3",
                    "WDM_PROVENANCE_NAME": "Raft Data Platform",
                    "WDM_PROVENANCE_SOURCE_ID": "Raft Data Platform",
                    "WDM_SERVER_URL": "http://df-core",
                    "WDM_TIMEOUT_MS": "30000",
                    "WRITE_CONCURRENCY_MODE": "partitioned"
                },
                "replicas": 1,
                "file_configuration": {
                    "mapping": {
                        "base64_encoded_text": ""
                    }
                }
            },
            "position": {
                "x": 600.3135164612133,
                "y": 441.7549504225129
            },
        }
    ],
    "datasets": [
        {
            "uid": "07612660-e124-4026-b15b-01d147f44dee",
            "display_name": "Ingested TAK Messages",
            "description": "Each message represents a single TAK message ingested from the remote source.",
            "configuration": {
                "ref": "tak.landing",
                "conn_type": "INTERNAL_KAFKA"
            },
            "position": {
                "x": -540.3269088596635,
                "y": 440.0018360782116
            },
            "catalog": {
                "datasource_id": "fe02644a-2c10-4e7e-a9b0-3eaad32ccea7",
                "enablement_id": "fd36293f-42bd-4d55-8a2b-eb266987acc7",
                "dataset_id": "18120485-b5d6-4964-9337-309613476e0f"
            }
        },
        {
            "uid": "74fec918-f4bc-4ae8-87b1-5ba2079603ca",
            "display_name": "CoT WDM Messages",
            "description": "TAK messages transformed from CoT XML into WDM (Protobuf)",
            "configuration": {
                "ref": "tak.landing",
                "conn_type": "INTERNAL_KAFKA"
            },
            "position": {
                "x": 253.37383265007168,
                "y": 435.7351153046987
            },
            "catalog": {
                "datasource_id": "fe02644a-2c10-4e7e-a9b0-3eaad32ccea7",
                "enablement_id": "fd36293f-42bd-4d55-8a2b-eb266987acc7",
                "dataset_id": "18120485-b5d6-4964-9337-309613476e0f"
            }
        }
    ],
    "connections": [
        {
            "from": {
                "id": "6be9e029-10d2-4eb5-8132-c4dece414bc2",
                "type": "Transformer",
                "conn": "KAFKA_OUT_TOPIC"
            },
            "to": {
                "id": "07612660-e124-4026-b15b-01d147f44dee",
                "type": "Dataset",
                "conn": "input"
            }
        },
        {
            "from": {
                "id": "07612660-e124-4026-b15b-01d147f44dee",
                "type": "Dataset",
                "conn": ""
            },
            "to": {
                "id": "3e9efa01-4953-49c7-a08a-4fa64443bcfc",
                "type": "Transformer",
                "conn": "KAFKA_IN_TOPIC"
            }
        },
        {
            "from": {
                "id": "3e9efa01-4953-49c7-a08a-4fa64443bcfc",
                "type": "Transformer",
                "conn": "KAFKA_OUT_TOPIC"
            },
            "to": {
                "id": "74fec918-f4bc-4ae8-87b1-5ba2079603ca",
                "type": "Dataset",
                "conn": ""
            }
        },
        {
            "from": {
                "id": "74fec918-f4bc-4ae8-87b1-5ba2079603ca",
                "type": "Dataset",
                "conn": ""
            },
            "to": {
                "id": "f2a1a99a-b2c1-4a81-a1cc-de4fdb449470",
                "type": "Transformer",
                "conn": "KAFKA_IN_TOPIC"
            }
        }
    ]
}