Quick Start Guide
Use the [R]DP package that matches the [R]DP instance you are integrating with. The examples below show various Transformation Engine operations in Go.
Create a Transformer Template
package main
import (
"context"
"fmt"
"io"
rdp "github.com/raft-tech/rdp-sdk"
)
func main() {
config := rdp.NewConfiguration()
config.Servers = []rdp.ServerConfiguration{
{URL: "https://rdp-example.com"},
}
client := rdp.NewAPIClient(config)
ctx := context.WithValue(context.Background(), rdp.ContextAPIKeys, map[string]rdp.APIKey{
"rdp_api_key": {
Key: "my-rdp-api-key"
},
})
var tr rdp.V1PipelinesTransformerRecord
tr.SetUid("new-transformer-abc")
tr.SetName("My New Transformer")
tr.SetDescription("Transforms data.")
tr.SetStatus("available")
var env rdp.V1PipelinesEnvironmentTpl
env.SetName("LOG_LEVEL")
env.SetRequired(true)
env.SetDescription("The log level of this application. Must be set at startup.")
var conf rdp.V1PipelinesTConfigurationTpl
conf.SetEnvironment([]rdp.V1PipelinesEnvironmentTpl{env})
tr.SetConfiguration(conf)
var input rdp.V1PipelinesDataConnTpl
input.SetDisplayName("Incoming messages")
input.SetConnType("INTERNAL_KAFKA") // INTERNAL_KAFKA, INTERNAL_MINIO, INTERNAL_ICEBERG
var output rdp.V1PipelinesDataConnTpl
output.SetDisplayName("Outgoing files")
output.SetConnType("INTERNAL_MINIO") // INTERNAL_KAFKA, INTERNAL_MINIO, INTERNAL_ICEBERG
tr.SetInputs(map[string]rdp.V1PipelinesDataConnTpl{
"INPUT_KAFKA_TOPIC": input,
})
tr.SetOutputs(map[string]rdp.V1PipelinesDataConnTpl{
"OUTPUT_MINIO_BUCKET": output,
})
var ji rdp.V1PipelinesImageConfig
ji.SetImage("my-container-image")
ji.SetPullPolicy("IfNotPresent")
ji.SetDefaultReplicas(1)
var inst rdp.V1PipelinesCreationConfig
inst.SetJobImage(ji)
tr.SetInstantiation(inst)
resp, r, err := client.V1PipelinesTransformerAPI.V1PipelinesTransformerPost(ctx).V1PipelinesTransformerRecord(tr).Execute()
if err != nil {
b, _ := io.ReadAll(r.Body)
fmt.Println(string(b))
panic(err)
}
fmt.Printf("Created Transformer with UID '%s'\n", *resp.Uid)
}
Retrieve a Transformer
package main
import (
"context"
"fmt"
rdp "github.com/raft-tech/rdp-sdk"
)
func main() {
config := rdp.NewConfiguration()
config.Servers = []rdp.ServerConfiguration{
{URL: "https://rdp-example.com"},
}
client := rdp.NewAPIClient(config)
ctx := context.WithValue(context.Background(), rdp.ContextAPIKeys, map[string]rdp.APIKey{
"rdp_api_key": {
Key: "my-rdp-api-key"
},
})
tr, _, err := client.V1PipelinesTransformerAPI.V1PipelinesTransformerUidGet(ctx, "new-transformer-abc").Execute()
if err != nil {
panic(err)
}
fmt.Printf("Found transformer: %+v\n", tr)
}
List all Pipelines
package main
import (
"context"
"fmt"
rdp "github.com/raft-tech/rdp-sdk"
)
func main() {
config := rdp.NewConfiguration()
config.Servers = []rdp.ServerConfiguration{
{URL: "https://rdp-example.com"},
}
client := rdp.NewAPIClient(config)
ctx := context.WithValue(context.Background(), rdp.ContextAPIKeys, map[string]rdp.APIKey{
"rdp_api_key": {
Key: "my-rdp-api-key"
},
})
pipelines, _, err := client.V2PipelinesPipelineAPI.V2PipelinesInstanceGetAll(ctx).Execute()
if err != nil {
panic(err)
}
fmt.Printf("Found %d pipelines.\n", len(pipelines))
}
Get the current status of a Pipeline
package main
import (
"context"
"fmt"
rdp "github.com/raft-tech/rdp-sdk"
)
func main() {
config := rdp.NewConfiguration()
config.Servers = []rdp.ServerConfiguration{
{URL: "https://rdp-example.com"},
}
client := rdp.NewAPIClient(config)
ctx := context.WithValue(context.Background(), rdp.ContextAPIKeys, map[string]rdp.APIKey{
"rdp_api_key": {
Key: "my-api-key",
},
})
uid := "my-pipeline-uid"
status, _, err := client.V2PipelinesPipelineAPI.V2PipelinesInstanceStatusGet(ctx, uid).Execute()
if err != nil {
panic(err)
}
fmt.Printf("Status for pipeline %s: %+v", uid, status)
}
Create a Pipeline
package main
import (
"context"
rdp "github.com/raft-tech/rdp-sdk"
)
func main() {
config := rdp.NewConfiguration()
config.Servers = []rdp.ServerConfiguration{
{URL: "https://rdp-example.com"},
}
client := rdp.NewAPIClient(config)
ctx := context.WithValue(context.Background(), rdp.ContextAPIKeys, map[string]rdp.APIKey{
"rdp_api_key": {Key: "my-api-key"},
})
pipeline := rdp.NewV2PipelinesPipeline()
pipeline.SetName("example-pipeline")
pipeline.SetDescription("example pipeline")
pipeline.SetTransformers([]rdp.V2PipelinesTransformerInstancePost{
*rdp.NewV2PipelinesTransformerInstancePost("example-source-template", "source"),
*rdp.NewV2PipelinesTransformerInstancePost("example-sink-template", "sink"),
})
pipeline.SetDatasets([]rdp.V2PipelinesDatasetRefPost{
*rdp.NewV2PipelinesDatasetRefPost("dataset"),
})
pipeline.SetConnections([]rdp.V2PipelinesConnection{
*rdp.NewV2PipelinesConnection(
*rdp.NewV2PipelinesConnectionEndpoint("output", "source", "transformer"),
*rdp.NewV2PipelinesConnectionEndpoint("input", "dataset", "dataset"),
),
*rdp.NewV2PipelinesConnection(
*rdp.NewV2PipelinesConnectionEndpoint("output", "dataset", "dataset"),
*rdp.NewV2PipelinesConnectionEndpoint("input", "sink", "transformer"),
),
})
pipelines := client.V2PipelinesPipelineAPI
if _, _, err := pipelines.V2PipelinesInstanceCreatePost(ctx).
V2PipelinesPipeline(*pipeline).
Execute(); err != nil {
panic(err)
}
}
Manage a Pipeline’s state
package main
import (
"context"
rdp "github.com/raft-tech/rdp-sdk"
)
func main() {
config := rdp.NewConfiguration()
config.Servers = []rdp.ServerConfiguration{
{URL: "https://rdp-example.com"},
}
client := rdp.NewAPIClient(config)
ctx := context.WithValue(context.Background(), rdp.ContextAPIKeys, map[string]rdp.APIKey{
"rdp_api_key": {Key: "my-api-key"},
})
uid := "my-pipeline-uid"
pipelines := client.V2PipelinesPipelineAPI
if _, err := pipelines.V2PipelinesInstanceStartPost(ctx, uid).Execute(); err != nil {
panic(err)
}
if _, err := pipelines.V2PipelinesInstanceStopPost(ctx, uid).Execute(); err != nil {
panic(err)
}
if _, err := pipelines.V2PipelinesInstanceRestartPost(ctx, uid).Execute(); err != nil {
panic(err)
}
}