Quick Start Guide

Use the [R]DP package that matches the [R]DP instance you are integrating with. The examples below show various Transformation Engine operations in Go.

Create a Transformer Template

package main

import (
	"context"
	"fmt"
	"io"

	rdp "github.com/raft-tech/rdp-sdk"
)

func main() {
	config := rdp.NewConfiguration()
	config.Servers = []rdp.ServerConfiguration{
		{URL: "https://rdp-example.com"},
	}

	client := rdp.NewAPIClient(config)

	ctx := context.WithValue(context.Background(), rdp.ContextAPIKeys, map[string]rdp.APIKey{
		"rdp_api_key": {
			Key: "my-rdp-api-key"
		},
	})

	var tr rdp.V1PipelinesTransformerRecord
	tr.SetUid("new-transformer-abc")
	tr.SetName("My New Transformer")
	tr.SetDescription("Transforms data.")
	tr.SetStatus("available")

	var env rdp.V1PipelinesEnvironmentTpl
	env.SetName("LOG_LEVEL")
	env.SetRequired(true)
	env.SetDescription("The log level of this application. Must be set at startup.")

	var conf rdp.V1PipelinesTConfigurationTpl
	conf.SetEnvironment([]rdp.V1PipelinesEnvironmentTpl{env})
	tr.SetConfiguration(conf)

	var input rdp.V1PipelinesDataConnTpl
	input.SetDisplayName("Incoming messages")
	input.SetConnType("INTERNAL_KAFKA") // INTERNAL_KAFKA, INTERNAL_MINIO, INTERNAL_ICEBERG

	var output rdp.V1PipelinesDataConnTpl
	output.SetDisplayName("Outgoing files")
	output.SetConnType("INTERNAL_MINIO") // INTERNAL_KAFKA, INTERNAL_MINIO, INTERNAL_ICEBERG

	tr.SetInputs(map[string]rdp.V1PipelinesDataConnTpl{
		"INPUT_KAFKA_TOPIC": input,
	})
	tr.SetOutputs(map[string]rdp.V1PipelinesDataConnTpl{
		"OUTPUT_MINIO_BUCKET": output,
	})

	var ji rdp.V1PipelinesImageConfig
	ji.SetImage("my-container-image")
	ji.SetPullPolicy("IfNotPresent")
	ji.SetDefaultReplicas(1)

	var inst rdp.V1PipelinesCreationConfig
	inst.SetJobImage(ji)

	tr.SetInstantiation(inst)

	resp, r, err := client.V1PipelinesTransformerAPI.V1PipelinesTransformerPost(ctx).V1PipelinesTransformerRecord(tr).Execute()
	if err != nil {
		b, _ := io.ReadAll(r.Body)
		fmt.Println(string(b))
		panic(err)
	}

	fmt.Printf("Created Transformer with UID '%s'\n", *resp.Uid)
}

Retrieve a Transformer

package main

import (
	"context"
	"fmt"

	rdp "github.com/raft-tech/rdp-sdk"
)

func main() {
	config := rdp.NewConfiguration()
	config.Servers = []rdp.ServerConfiguration{
		{URL: "https://rdp-example.com"},
	}

	client := rdp.NewAPIClient(config)

	ctx := context.WithValue(context.Background(), rdp.ContextAPIKeys, map[string]rdp.APIKey{
		"rdp_api_key": {
			Key: "my-rdp-api-key"
		},
	})

	tr, _, err := client.V1PipelinesTransformerAPI.V1PipelinesTransformerUidGet(ctx, "new-transformer-abc").Execute()
	if err != nil {
		panic(err)
	}

	fmt.Printf("Found transformer: %+v\n", tr)
}

List all Pipelines

package main
import (
	"context"
	"fmt"

	rdp "github.com/raft-tech/rdp-sdk"
)

func main() {
	config := rdp.NewConfiguration()
	config.Servers = []rdp.ServerConfiguration{
		{URL: "https://rdp-example.com"},
	}

	client := rdp.NewAPIClient(config)

	ctx := context.WithValue(context.Background(), rdp.ContextAPIKeys, map[string]rdp.APIKey{
		"rdp_api_key": {
			Key: "my-rdp-api-key"
		},
	})

	pipelines, _, err := client.V2PipelinesPipelineAPI.V2PipelinesInstanceGetAll(ctx).Execute()
	if err != nil {
		panic(err)
	}

	fmt.Printf("Found %d pipelines.\n", len(pipelines))
}

Get the current status of a Pipeline

package main

import (
	"context"
	"fmt"

	rdp "github.com/raft-tech/rdp-sdk"
)

func main() {
	config := rdp.NewConfiguration()
	config.Servers = []rdp.ServerConfiguration{
		{URL: "https://rdp-example.com"},
	}

	client := rdp.NewAPIClient(config)

	ctx := context.WithValue(context.Background(), rdp.ContextAPIKeys, map[string]rdp.APIKey{
		"rdp_api_key": {
			Key: "my-api-key",
		},
	})

	uid := "my-pipeline-uid"
	status, _, err := client.V2PipelinesPipelineAPI.V2PipelinesInstanceStatusGet(ctx, uid).Execute()
	if err != nil {
		panic(err)
	}

	fmt.Printf("Status for pipeline %s: %+v", uid, status)
}

Create a Pipeline

package main

import (
	"context"

	rdp "github.com/raft-tech/rdp-sdk"
)

func main() {
	config := rdp.NewConfiguration()
	config.Servers = []rdp.ServerConfiguration{
		{URL: "https://rdp-example.com"},
	}

	client := rdp.NewAPIClient(config)
	ctx := context.WithValue(context.Background(), rdp.ContextAPIKeys, map[string]rdp.APIKey{
		"rdp_api_key": {Key: "my-api-key"},
	})

	pipeline := rdp.NewV2PipelinesPipeline()
	pipeline.SetName("example-pipeline")
	pipeline.SetDescription("example pipeline")
	pipeline.SetTransformers([]rdp.V2PipelinesTransformerInstancePost{
		*rdp.NewV2PipelinesTransformerInstancePost("example-source-template", "source"),
		*rdp.NewV2PipelinesTransformerInstancePost("example-sink-template", "sink"),
	})
	pipeline.SetDatasets([]rdp.V2PipelinesDatasetRefPost{
		*rdp.NewV2PipelinesDatasetRefPost("dataset"),
	})
	pipeline.SetConnections([]rdp.V2PipelinesConnection{
		*rdp.NewV2PipelinesConnection(
			*rdp.NewV2PipelinesConnectionEndpoint("output", "source", "transformer"),
			*rdp.NewV2PipelinesConnectionEndpoint("input", "dataset", "dataset"),
		),
		*rdp.NewV2PipelinesConnection(
			*rdp.NewV2PipelinesConnectionEndpoint("output", "dataset", "dataset"),
			*rdp.NewV2PipelinesConnectionEndpoint("input", "sink", "transformer"),
		),
	})

	pipelines := client.V2PipelinesPipelineAPI
	if _, _, err := pipelines.V2PipelinesInstanceCreatePost(ctx).
		V2PipelinesPipeline(*pipeline).
		Execute(); err != nil {
		panic(err)
	}
}

Manage a Pipeline’s state

package main

import (
	"context"

	rdp "github.com/raft-tech/rdp-sdk"
)

func main() {
	config := rdp.NewConfiguration()
	config.Servers = []rdp.ServerConfiguration{
		{URL: "https://rdp-example.com"},
	}

	client := rdp.NewAPIClient(config)
	ctx := context.WithValue(context.Background(), rdp.ContextAPIKeys, map[string]rdp.APIKey{
		"rdp_api_key": {Key: "my-api-key"},
	})

	uid := "my-pipeline-uid"
	pipelines := client.V2PipelinesPipelineAPI

	if _, err := pipelines.V2PipelinesInstanceStartPost(ctx, uid).Execute(); err != nil {
		panic(err)
	}

	if _, err := pipelines.V2PipelinesInstanceStopPost(ctx, uid).Execute(); err != nil {
		panic(err)
	}

	if _, err := pipelines.V2PipelinesInstanceRestartPost(ctx, uid).Execute(); err != nil {
		panic(err)
	}
}