Skip to content

Latest commit

 

History

History
112 lines (101 loc) · 2.74 KB

File metadata and controls

112 lines (101 loc) · 2.74 KB

1.格式:

CREATE TABLE tableName(
    colName colType,
    ...
    colNameX colType
 )WITH(
    type ='redis',
    url = 'ip:port',
    database ='dbName',
    password ='pwd',
    redisType='1',
    tableName ='tableName',
    parallelism ='parllNum'
 );

2.支持版本

redis5.0

3.表结构定义

参数名称 含义
tableName 在 sql 中使用的名称;即注册到flink-table-env上的名称
colName 列名称,redis中存储为 表名:主键名:主键值:列名]
colType 列类型,当前只支持varchar

4.参数:

参数名称 含义 是否必填 默认值
type 表名 输出表类型[mysq|hbase|elasticsearch|redis]
url redis 的地址;格式ip:port[,ip:port]
password redis 的密码
redisType redis模式(1 单机,2 哨兵, 3 集群)
masterName 主节点名称(哨兵模式下为必填项)
database reids 的数据库地址
tableName redis 的表名称
parallelism 并行度设置 1
timeout 连接超时时间 10000
maxTotal 最大连接数 8
maxIdle 最大空闲连接数 8
minIdle 最小空闲连接数
masterName 哨兵模式下的masterName
primarykeys 主键字段,多个字段以逗号分割

5.样例:

 CREATE TABLE MyTable(
     name varchar,
     channel varchar
  )WITH(
     type ='kafka10',
     bootstrapServers ='172.16.8.107:9092',
     zookeeperQuorum ='172.16.8.107:2181/kafka',
     offsetReset ='latest',
     topic ='mqTest01',
     timezone='Asia/Shanghai',
     updateMode ='append',
     enableKeyPartitions ='false',
     topicIsPattern ='false',
     parallelism ='1'
  );
 
 CREATE TABLE MyResult(
     channel VARCHAR,
     pv VARCHAR
  )WITH(
     type ='redis',
     primarykeys='name',
     redisType ='1',
     url ='172.16.8.109:6379',
     tableName ='resultTable',
     partitionedJoin ='false',
     parallelism ='1',
     database ='0',
     timeout ='10000',
     maxTotal ='60000',
     maxIdle='8',
     minIdle='0'
  );
 
 insert          
 into
     MyResult
     select
         channel,
         name as pv                                             
     from
         MyTable a                                        

6.redis完整样例

redis数据说明

redis使用k-v格式存储,key的构建格式为tableName:privateKey:privateKeyValue:columnName, value=columnValue

源表数据内容

{"name":"roc","channel":"daishu","age":2}

redis实际数据内容

127.0.0.1:6379> keys *
1) "resultTable:name:roc:name"
2) "resultTable:name:roc:channel"
127.0.0.1:6379> get "resultTable:name:roc:name"
"roc"
127.0.0.1:6379> get "resultTable:name:roc:channel"
"daishu"