Kafka Connect 配置source task and sink task 操作手册

Ordiy Lv5

config sink

1
2
3
deleted_at datetime default null
created_at datetime default now

config sink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
{
"transforms": "InsertTimestamp,keyToValue,TimestampConverterEnd,InsertDefault,TimestampConverterDeleted",

"transforms.InsertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTimestamp.timestamp.field": "msg_event_time",
"transforms.keyToValue.type": "com.clickhouse.kafka.connect.transforms.KeyToValue",
"transforms.keyToValue.field": "id",

"transforms.TimestampConverterEnd.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverterEnd.field":"created_at",
"transforms.TimestampConverterEnd.format":"yyyy-MM-dd'T'HH:mm:ss.SSSX",
"transforms.TimestampConverterEnd.target.type":"unix",

# deleted_at 可能为空, 替换为默认值
"transforms.InsertDefault.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertDefault.static.field": "device_first_seen_at",
"transforms.InsertDefault.static.value": "1970-01-01T00:00:00.000Z",
"transforms.InsertDefault.skip.if.exists": false,


# 字符串转换为 long timestamp
"transforms.TimestampConverterDeleted.field": "device_first_seen_at",
"transforms.TimestampConverterDeleted.format": "yyyy-MM-dd'T'HH:mm:ss.SSSX",
"transforms.TimestampConverterDeleted.target.type": "unix",
"transforms.TimestampConverterDeleted.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",



}
  • Title: Kafka Connect 配置source task and sink task 操作手册
  • Author: Ordiy
  • Created at : 2025-04-29 20:17:01
  • Updated at : 2025-07-03 12:20:41
  • Link: https://ordiy.github.io/posts/2024-10-10-kafka-connect-config-source-and-sink-task/
  • License: This work is licensed under CC BY 4.0.
On this page
Kafka Connect 配置source task and sink task 操作手册