要去掉Flink SQL数据写入Kafka时的key为before、after及op,可以通过以下步骤进行配置:
(图片来源网络,侵删)1、在Flink SQL中创建表时,指定ROW FORMAT SERDE
为Kafka
,并设置SERDE_PROPERTIES
参数。
2、在SERDE_PROPERTIES
中,设置"key.deserializer"
和"value.deserializer"
为自定义的Deserializer类。
3、实现自定义的Deserializer类,重写deserialize
方法,对输入的Key和Value进行处理,去掉不需要的部分。
以下是具体的操作步骤:
1、创建表时指定ROW FORMAT SERDE
为Kafka
,并设置SERDE_PROPERTIES
参数:
CREATE TABLE kafka_table ( ... ) WITH ( 'connector' = 'kafka', 'topic' = 'your_topic', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'json', 'json.failonmissingfield' = 'false', 'json.ignoreparseerrors' = 'true', 'json.nullkey' = '\N', 'json.valueschema' = '{"type":"struct","fields":[{"name":"before","type":"string"},{"name":"after","type":"string"},{"name":"op","type":"string"}]}', 'serde' = 'com.example.CustomKafkaDeserializer', 'serde.properties.schema.registry.url' = 'http://localhost:8081' );
2、实现自定义的Deserializer类:
import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import java.util.*; public class CustomKafkaDeserializer implements DeserializationSchema<Row> { private final ObjectMapper objectMapper = new ObjectMapper(); private final String beforeKey = "before"; private final String afterKey = "after"; private final String opKey = "op"; private final TypeInformation<Row> rowTypeInfo; private final Map<String, Integer> fieldIndexMap; private final List<String> fieldsToIgnore; private final Set<String> keysToRemove; private final Set<String> keysToKeep; private final boolean ignoreParseErrors; private final boolean failOnMissingField; private final String nullKey; private final String valueSchema; private final String schemaRegistryUrl; private final String topic; private final String groupId; private final String bootstrapServers; private final String keyDeserializerClassName; private final String valueDeserializerClassName; private final Class<? extends Deserializer> keyDeserializerClass; private final Class<? extends Deserializer> valueDeserializerClass; private final Properties serdeProperties; private final KafkaDeserializationSchema kafkaDeserializationSchema; private final String beforeValueDefault; private final String afterValueDefault; private final String opValueDefault; private final Row defaultValue; private final boolean isKeyNullable; private final boolean isValueNullable; private final boolean isKeyRequired; private final boolean isValueRequired; private final boolean isKeyIgnored; private final boolean isValueIgnored; private final boolean isKeyUsedForJoining; private final boolean isValueUsedForJoining; private final boolean isKeyUsedForPartitioning; private final boolean isValueUsedForPartitioning; private final boolean isKeyUsedForOrdering; private final boolean isValueUsedForOrdering; private final boolean isKeyReversed; private final boolean isValueReversed; private final boolean isKeyCaseInsensitive; private final boolean isValueCaseInsensitive; private final int[] keyFieldsIndexesArray; private final int[] valueFieldsIndexesArray; private final List<String> keyFieldsList; private final List<String> valueFieldsList; // ...其他字段和方法... }
3、在自定义的Deserializer类中,重写deserialize
方法,对输入的Key和Value进行处理,去掉不需要的部分。
@Override public Row deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception { try { JsonNode jsonNode = objectMapper.readTree(record.value()); String before = jsonNode != null && jsonNode.has(beforeKey) ? jsonNode.get(beforeKey).asText() : beforeValueDefault; // 如果需要,可以设置默认值或处理异常情况。
最新评论
本站CDN与莫名CDN同款、亚太CDN、速度还不错,值得推荐。
感谢推荐我们公司产品、有什么活动会第一时间公布!
我在用这类站群服务器、还可以. 用很多年了。