CREATE TABLE MyResult(
colFamily:colName colType,
...
)WITH(
type ='hbase',
zookeeperQuorum ='ip:port[,ip:port]',
tableName ='tableName',
rowKey ='colName[,colName]',
parallelism ='1',
zookeeperParent ='/hbase'
)
hbase2.0
| 参数名称 | 含义 |
|---|---|
| tableName | 在 sql 中使用的名称;即注册到flink-table-env上的名称 |
| colFamily:colName | hbase中的列族名称和列名称 |
| colType | 列类型 colType支持的类型 |
| 参数名称 | 含义 | 是否必填 | 默认值 |
|---|---|---|---|
| type | 表明 输出表类型[mysq|hbase|elasticsearch] | 是 | |
| zookeeperQuorum | hbase zk地址,多个直接用逗号隔开 | 是 | |
| zookeeperParent | zkParent 路径 | 是 | |
| tableName | 关联的hbase表名称 | 是 | |
| rowkey | hbase的rowkey关联的列信息,多个值以逗号隔开 | 是 | |
| updateMode | APPEND:不回撤数据,只下发增量数据,UPSERT:先删除回撤数据,然后更新 | 否 | APPEND| |
| parallelism | 并行度设置 | 否 | 1 |
CREATE TABLE MyTable(
name varchar,
channel varchar,
age int
)WITH(
type ='kafka10',
bootstrapServers ='172.16.8.107:9092',
zookeeperQuorum ='172.16.8.107:2181/kafka',
offsetReset ='latest',
topic ='mqTest01',
timezone='Asia/Shanghai',
updateMode ='append',
enableKeyPartitions ='false',
topicIsPattern ='false',
parallelism ='1'
);
CREATE TABLE MyResult(
cf:name varchar ,
cf:channel varchar
)WITH(
type ='hbase',
zookeeperQuorum ='172.16.10.104:2181,172.16.10.224:2181,172.16.10.252:2181',
zookeeperParent ='/hbase',
tableName ='myresult',
partitionedJoin ='false',
parallelism ='1',
rowKey='name,channel'
);
insert
into
MyResult
select
channel,
name
from
MyTable a
hbase的rowkey 构建规则:以描述的rowkey字段值作为key,多个字段以'-'连接
hbase(main):007:0> scan 'myresult'
ROW COLUMN+CELL
roc-daishu column=cf:channel, timestamp=1589183971724, value=daishu
roc-daishu column=cf:name, timestamp=1589183971724, value=roc