CREATE TABLE tableName(
colName colType,
...
function(colNameX) AS aliasName,
WATERMARK FOR colName AS withOffset( colName , delayTime )
)WITH(
type ='kafka09',
bootstrapServers ='ip:port,ip:port...',
zookeeperQuorum ='ip:port,ip:port/zkparent',
offsetReset ='latest',
topic ='topicName',
groupId='test',
parallelism ='parllNum',
timezone='Asia/Shanghai',
sourcedatatype ='dt_nest' #可不设置
);
kafka09,kafka10,kafka11及以上版本
kafka读取和写入的版本必须一致,否则会有兼容性错误。
| 参数名称 | 含义 |
|---|---|
| tableName | 在 sql 中使用的名称;即注册到flink-table-env上的名称 |
| colName | 列名称 |
| colType | 列类型 colType支持的类型 |
| function(colNameX) as aliasName | 支持在定义列信息的时候根据已有列类型生成新的列(函数可以使用系统函数和已经注册的UDF) |
| WATERMARK FOR colName AS withOffset( colName , delayTime ) | 标识输入流生的watermake生成规则,根据指定的colName(当前支持列的类型为Long | Timestamp) 和delayTime生成waterMark 同时会在注册表的使用附带上rowtime字段(如果未指定则默认添加proctime字段);注意:添加该标识的使用必须设置系统参数 time.characteristic:EventTime; delayTime: 数据最大延迟时间(ms) |
| 参数名称 | 含义 | 是否必填 | 默认值 |
|---|---|---|---|
| type | kafka09 | 是 | kafka09、kafka10、kafka11、kafka(对应kafka1.0及以上版本) |
| groupId | 需要读取的 groupId 名称 | 否 | |
| bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开) | 是 | |
| zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔) | 是 | |
| topic | 需要读取的 topic 名称 | 是 | |
| topicIsPattern | topic是否是正则表达式格式(true|false) | 否 | false |
| offsetReset | 读取的topic 的offset初始位置[latest|earliest|指定offset值({"0":12312,"1":12321,"2":12312},{"partition_no":offset_value})] | 否 | latest |
| parallelism | 并行度设置 | 否 | 1 |
| sourcedatatype | 数据类型,avro,csv,json,dt_nest。dt_nest为默认JSON解析器,能够解析嵌套JSON数据类型,其他仅支持非嵌套格式 | 否 | dt_nest |
| schemaInfo | avro类型使用的schema信息 | 否 | |
| fieldDelimiter | csv类型使用的数据分隔符 | 否 | |
| timezone | 时区设置timezone支持的参数 | 否 | 'Asia/Shanghai' |
| kafka相关参数可以自定义,使用kafka.开头即可。 |
kafka.consumer.id
kafka.socket.timeout.ms
kafka.fetch.message.max.bytes
kafka.num.consumer.fetchers
kafka.auto.commit.enable
kafka.auto.commit.interval.ms
kafka.queued.max.message.chunks
kafka.rebalance.max.retries
kafka.fetch.min.bytes
kafka.fetch.wait.max.ms
kafka.rebalance.backoff.ms
kafka.refresh.leader.backoff.ms
kafka.consumer.timeout.ms
kafka.exclude.internal.topics
kafka.partition.assignment.strategy
kafka.client.id
kafka.zookeeper.session.timeout.ms
kafka.zookeeper.connection.timeout.ms
kafka.zookeeper.sync.time.ms
kafka.offsets.storage
kafka.offsets.channel.backoff.ms
kafka.offsets.channel.socket.timeout.ms
kafka.offsets.commit.max.retries
kafka.dual.commit.enabled
kafka.partition.assignment.strategy
kafka.socket.receive.buffer.bytes
kafka.fetch.min.bytes
###kerberos认证相关参数
kafka.security.protocal
kafka.sasl.mechanism
kafka.sasl.kerberos.service.name
CREATE TABLE MyTable(
name varchar,
channel varchar,
pv INT,
xctime bigint,
CHARACTER_LENGTH(channel) AS timeLeng
)WITH(
type ='kafka09',
bootstrapServers ='172.16.8.198:9092',
zookeeperQuorum ='172.16.8.198:2181/kafka',
offsetReset ='latest',
topic ='nbTest1,nbTest2,nbTest3',
--topic ='mqTest.*',
--topicIsPattern='true'
parallelism ='1',
sourcedatatype ='json' #可不设置
);
嵌套json解析示例
json: {"name":"tom", "obj":{"channel": "root"}, "pv": 4, "xctime":1572932485}
CREATE TABLE MyTable(
name varchar,
obj.channel varchar as channel,
pv INT,
xctime bigint,
CHARACTER_LENGTH(channel) AS timeLeng
)WITH(
type ='kafka09',
bootstrapServers ='172.16.8.198:9092',
zookeeperQuorum ='172.16.8.198:2181/kafka',
offsetReset ='latest',
groupId='nbTest',
topic ='nbTest1,nbTest2,nbTest3',
--- topic ='mqTest.*',
---topicIsPattern='true',
parallelism ='1'
);
数组类型字段解析示例
json: {"name":"tom", "obj":{"channel": "root"}, "user": [{"pv": 4}, {"pv": 10}], "xctime":1572932485}
CREATE TABLE MyTable(
name varchar,
obj.channel varchar as channel,
user[1].pv INT as pv,
xctime bigint,
CHARACTER_LENGTH(channel) AS timeLeng
)WITH(
type ='kafka09',
bootstrapServers ='172.16.8.198:9092',
zookeeperQuorum ='172.16.8.198:2181/kafka',
offsetReset ='latest',
groupId='nbTest',
topic ='nbTest1,nbTest2,nbTest3',
--- topic ='mqTest.*',
---topicIsPattern='true',
parallelism ='1'
);
or
json: {"name":"tom", "obj":{"channel": "root"}, "pv": [4, 7, 10], "xctime":1572932485}
CREATE TABLE MyTable(
name varchar,
obj.channel varchar as channel,
pv[1] INT as pv,
xctime bigint,
CHARACTER_LENGTH(channel) AS timeLeng
)WITH(
type ='kafka09',
bootstrapServers ='172.16.8.198:9092',
zookeeperQuorum ='172.16.8.198:2181/kafka',
offsetReset ='latest',
groupId='nbTest',
topic ='nbTest1,nbTest2,nbTest3',
--- topic ='mqTest.*',
---topicIsPattern='true',
parallelism ='1'
);
CREATE TABLE MyTable(
name varchar,
channel varchar,
pv INT,
xctime bigint,
CHARACTER_LENGTH(channel) AS timeLeng
)WITH(
type ='kafka09',
bootstrapServers ='172.16.8.198:9092',
zookeeperQuorum ='172.16.8.198:2181/kafka',
offsetReset ='latest',
topic ='nbTest1',
--topic ='mqTest.*',
--topicIsPattern='true'
parallelism ='1',
sourceDatatype ='csv'
);
CREATE TABLE MyTable(
channel varchar,
pv varchar
--xctime bigint
)WITH(
type='kafka',
bootstrapServers='172.16.8.107:9092',
groupId='mqTest01',
offsetReset='latest',
topic='mqTest01',
parallelism ='1',
topicIsPattern ='false',
kafka.group.id='mqTest',
sourceDataType ='avro',
schemaInfo = '{"type":"record","name":"MyResult","fields":[{"name":"channel","type":"string"},{"name":"pv","type":"string"}]}'
);