相关文章推荐

Redis

Redis连接器支持流式写,维表join

FLINK-1.12

DDL写法

1、建表

当redis表作为sink时,需要配置connector.command

CREATE TABLE dim_table (
    user_id VARCHAR,
    age varchar
) WITH (
    'connector' = 'redis',
    'host' = '10.122.173.131',
    'port' = '6379',
    'mode' = 'single',
    'db.index' = '0',
    'password' = 'xxx',
    'lookup.cache.type' = 'all',
    'lookup.cache.ttl' = '3000',
    'primary.key' = 'user_id'
CREATE TABLE sink (
    item_id VARCHAR,
    item_type varchar
) WITH (
    'connector' = 'redis',
    'host' = '10.122.173.131',
    'port' = '6379',
    'mode' = 'single',
    'db.index' = '0',
    'password' = 'xxx',
    'lookup.cache.type' = 'all',
    'lookup.cache.ttl' = '3000',
    'primary.key' = 'item_id',
    'connector.command' = 'SET'
);

2、维表join

INSERT INTO pvuvage_sink
select d.ts as dt, d.age, count(*) as pv, count(distinct d.user_id) as uv
from (
SELECT
  u.ts,
  cast(w.age as int)  as age,
  u.user_id,
  u.behavior
FROM (select user_id,item_id,category_id,behavior,ts,PROCTIME() as proc from user_log) as u
join dim_table for system_time as of u.proc as w
on u.user_id = w.user_id
where w.age > 10
) as d
GROUP BY d.ts, d.age;

Metahub写法

1、建表

redis命令名作为属性头

set 'SADD.db.index' ='4';
set 'SADD.primary.key' = 'item_id';

2、维表join

使用metahub方式的redis作为维表时,redis命令使用GET,关联键名是k,值名是v

INSERT INTO sloth_redis_test.mem.`HSET`
select d.ts as item_id, cast(d.age as string) as item_type, cast(count(*) as string) as `redis.value`
from (
SELECT
  u.ts,
  cast(w.v as int)  as age,
  u.user_id,
  u.behavior
FROM (select user_id,item_id,category_id,behavior,ts,PROCTIME() as proc from user_log) as u
join sloth_redis_test.`default`.`GET` for system_time as of u.proc as w
on u.user_id = w.k
where w.v > 10
) as d
GROUP BY d.ts, d.age;

支持的命令

'connector.type' = 'kafka' , 'connector.version' = 'universal' , 'connector.topic' = 'user_behavior' , 'connector.startup-mode' = 'latest-offset' , 'connector.properties.zookeeper.connect' = 'xxx' , 'connector.properties.bootstrap.servers' = 'xxx' , 'connector.properties.group.id' = 'xxx' , 'update-mode' = 'append' , 'format.type' = 'json' , 'format.derive-schema' = 'true' -- dim CREATE TABLE dim_table ( user_id VARCHAR , age varchar ) WITH ( 'connector.type' = 'redis' , 'host' = '*' , 'port' = '*' , 'mode' = 'single' , 'db.index' = '0' , 'password' = '*' , 'connector.lookup.cache.type' = 'all' , 'connector.lookup.cache.ttl' = '3000' -- sink CREATE TABLE pvuvage_sink ( dt VARCHAR , age INT , pv BIGINT , uv BIGINT ) WITH ( 'connector.type' = 'jdbc' , 'connector.url' = 'jdbc:mysql://10.122.173.167:3306/flink-test' , 'connector.table' = 'pvuv_age_sink_redis' , 'connector.username' = '*' , 'connector.password' = '*' , 'connector.write.flush.max-rows' = '1' INSERT INTO pvuvage_sink select DATE_FORMAT ( d . ts , 'yyyy-MM-dd HH:00' ) as dt , d . age , count (*) as pv , count ( distinct d . user_id ) as uv from ( SELECT u . ts , cast ( w . age as int ) as age , u . user_id , u . behavior FROM ( select user_id , item_id , category_id , behavior , ts , PROCTIME () as proc from user_log ) as u left join dim_table for system_time as of u . proc as w on u . user_id = w . user_id where w . age > 10 ) as d GROUP BY DATE_FORMAT ( d . ts , 'yyyy-MM-dd HH:00' ) , d . age ;
  • Flink 维表 join 官方文档
  • With 参数

    支持的命令 (none) String redis服务端主机名,可以配置成ip1:port1,ip2:port2,此时port属性无效,也可以配置成ip1,ip2,此时使用port属性里的端口号 (none) Integer redis服务端的缺省端口号
    password
    (none) String redis服务端用户密码 (none) String redis服务端模式,比如single、cluster、sentinel
    db.index
    (none) Integer 使用的redis库的索引值
    primary.key
    (none) String 指定某字段映射成redis键
    connector.command
    (none) String redis命令类型 (none) Integer
    master
    (none) String
    lookup.cache.max-rows
    (none) Integer 最多缓存键的个数
    lookup.cache.ttl
    (none) Integer 配置缓存类型为lru后,可配置缓存保留时长
    lookup.max-retries
    (none) Integer 当没找到缓存后,最多从redis查找的次数
    lookup.cache.type
    (none) String 缓存类型,可配置none、lru、all
    redis 集群模式 选填;默认值为single;支持single, sentinel, cluster 3种集群模式;cluster及sentinel模式中,host用逗号分隔,如host,host,host...;sentinel模式需要设置master master master 地址 当 mode = 'sentinel' 时才需要 password redis 密码 当 mode 为 single、sentinel 可选填 db.index 对应 redis 的 db 选填;默认为0; connector.lookup.cache.type 选填,默认'none',支持:'all', 'lru', 'none' connector.lookup.cache.max-rows 最大缓存条数 ,默认10000条, type 为 lru 时有效 connector.lookup.cache.ttl 当选择'lru'表示缓存失效时间,默认不过期;当选择'all'表示 reload 间隔时间 选填,默认不 reload connector.lookup.max-retries dim 表获取失败时最大重试次数 选填,默认3次
     
    推荐文章