大数据合作开发之Flink sql 的基础用法
2025-09-19 12:18
java.lang.Double DOUBLE
double DOUBLE NOT NULL
java.sql.Date DATE
java.time.LocalDate DATE
java.sql.Time TIME(0)
java.time.LocalTime TIME(9)
java.sql.Timestamp TIMESTAMP(9)
java.time.LocalDateTime TIMESTAMP(9)
java.time.OffsetDateTime TIMESTAMP(9) WITH TIME ZONE
java.time.Instant TIMESTAMP(9) WITH LOCAL TIME ZONE
java.time.Duration INVERVAL SECOND(9)
java.time.Period I NTERVAL YEAR(4) TO MONTH
byte[] BYTES
T[] ARRAY
java.util.Map MAP
系统设计formula_ Brown 图标formula_/*
上头是1.12新版本的系统设计内置的formula_,说明我们可以到其网站查看,根据需求可用无需
*/
// TODO 主要介绍图标formula_
/*
udf 和 udaf 必需假设eval分析方法,借助自己的逻辑,说明系统设计时会调用对应的分析方法
udf : 广泛传播一个数值/多个/或者不广泛传播,回到一个新的数值,可以重载该分析方法,【关注尚硅谷,轻松研习IT】说明时会根据广泛传播的表达式调用对应eval烦恼歌发 相同MLT-mapMLT-算子,作用于sql
udaf : 图标聚合formula_,根据自己的逻辑假设累加器
udtf : 用作与表中所,可回到一个或多个数值,
*/
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import java.sql.SQLException;
public class UDFDemo {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.newInstance().build());
// 注册formula_
tEnv.registerFunction("customFunc1", new CustomUDF());
tEnv.registerFunction("customFunc2", new CustomUDAF());
tEnv.registerFunction("customFunc3", new CustomUDTF());
}
static class Acc {
int result;
public Integer gerResult() {
return result;
}
public Acc merge(Acc acc) {
result = acc.gerResult() + result;
return this;
}
public void incr() {
result++;
}
}
static class CustomUDF extends ScalarFunction {
// UDF 必需假设该分析方法
public int eval(String str) {
int hc = 0;
for (char c : str.toUpperCase().toCharArray()) {
hc = hashCode()>> c;
}
hc = hc - 1 - str.length();
hc = hc>> 7;
return hc;
}
}
static class CustomUDTF extends TableFunction {
// udtf 必需假设该分析方法,在该分析方法借助逻辑
public void eval(String str) throws SQLException {
if (str != null) {
for (String s : str.split(",")) {
Row row = new Row(2);
row.setField(0, s);
row.setField(1, 1);
collect(row);
}
}
}
@Override
public TypeInformation getResultType() {
return new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
}
}
static class CustomUDAF extends AggregateFunction {
@Override
public Integer getValue(Acc accumulator) {
return accumulator.gerResult();
}
@Override
public Acc createAccumulator() {
return new Acc();
}
// 累加
public void accumulate(Acc acc,String input){
if("*".equals(input)){
return;
}
acc.incr();
}
public void accumulate(Acc acc){
acc.incr();
}
}
}
最简单个案 代码❝
flink sql 中所间隔时间机制本质与 dataStream api 多种不同,似乎可用少于差异,稍加请注意无需,请注意以外 watermark 必需可用 sql 中所 timestamp(3)子类(说明对应 java 子类可根据前面子类必需判断),增设 watermark 后可可用 ROWTIEM 报文(说明看 sql 代码),没有增设可直接可用 PROCTIME 报文
请注意 : 多种不同的间隔时间语义要严格对应生态配置的间隔时间语义,否则确实出现异常
❝
间隔时间报文为两种,不属于非软件以外报文,增设完间隔时间语义后,根据需求可用说明的间隔时间报文
❞
ROWTIME : 事件间隔时间
PROCTIME : 处理间隔时间报文
情节 :
join : 情节与广元 join 或者 维表 join,现今 flink 拥护的不是很好
topN Brown 去重 : 语法基本多种不同,row_num> 1 即 topN , 当=1 则是去重操作
topN 情节一些热搜,排名等内容可
去重顾名思义,就是为了去重,去重时会就其到 retract 流(以后时会详细资料讲)内容可,时会格外新以后已经存在的结果
❞
// TODO 上头代码仅供参考,说明测试根据自己间隔时间生态来
// 以下只是一些最简单的个案,右方时会逐步深入复杂sql和基本概念层面
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @author 857hub
*/
public class ClickhouseSinkApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance().
// useBlinkPlanner().
build()
);
tEnv.getConfig().getConfiguration().setString(PipelineOptions.NAME, "sql test");
// sources
String source = "CREATE TABLE source (" +
" MLT-idMLT- int," +
" MLT-nameMLT- varchar." +
" MLT-tsMLT- timestamp(3)," +
// 以外watermark 允许延迟5s
"WATERMARK FOR ts AS ts - INTERVAL '5' SECOND"+
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'test1'," +
" 'properties.bootstrap.servers' = '172.16.100.109:9092'," +
" 'properties.group.id' = 'xzw'," +
" 'scan.startup.mode' = 'latest-offset'," +
" 'format' = 'json'" +
")";
String source2 = "CREATE TABLE source2 (" +
" MLT-idMLT- int," +
" MLT-nameMLT- varchar," +
" MLT-tsMLT- timestamp(3)" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'test2'," +
" 'properties.bootstrap.servers' = '172.16.100.109:9092'," +
" 'properties.group.id' = 'xzw'," +
" 'scan.startup.mode' = 'latest-offset'," +
" 'format' = 'json'" +
")";
// clickhouse sink 由我自己假设,右方时会对sql图标source和sink完成解说
String sink = "CREATE TABLE sink (" +
" MLT-idMLT- INT," +
" MLT-nameMLT- VARCHAR" +
") WITH (" +
// 必需图标接图表表达式 便是 option
" 'connector' = 'xzw_ck'," +
" 'url' = 'jdbc:clickhouse://localhost:8123/default'," +
" 'table-name' = 'test'," +
" 'username' = 'default'," +
" 'password' = '123456'" +
" )";
// 指派 source sink sql
tEnv.executeSql(source);
tEnv.executeSql(source2);
tEnv.executeSql(sink);
/*
由于是最简单可用,没有在情节应用领域,最简单介绍一下差异,可以根据们多种不同的差异在自己项目中所可用
left json : 无论是不是join上都回到左表的图表
inner join : 只有join上才时会回到匹配后的结果
full outer join : 两边的图表都时会回到,无论是不是join上,没有的则为null
interval join : 基于间隔时间各地区的join,在以外的间隔时间各地区回到join上的图表
*/
String joinSql = "select * from source1 s1" +
"left join source2 s2" +
// 内连接
// "inner join source2" || "join source2"
// 全部都是连接
// "full outer join source2"
// 间隔时间范围内join
// "s1.ts>= s2.ts AND s1.ts
" on s1.id =s2.id "
;
Table joinTable = tEnv.sqlQuery(joinSql);
// 分组排序,取topN, 如果要是去重 rnum=1无需借助去重操作
String insertSql = "insert into sink select id,name from(" +
"select *," +
"row_number() over(partition by id order by ts) as rnum " +
"from "+joinTable+" where rnum
")";
// add insert sql
TableResult tableResult = executeSql(tEnv, "insert into sink select * from source", "*",insertSql);
// 随心所欲可用
// Optional jobClient = tableResult.getJobClient();
}
// 加到多个sql说明指派
private static TableResult executeSql(StreamTableEnvironment tEnv, final String... sqls) {
StatementSet statementSet = tEnv.createStatementSet();
for (String sql : sqls) {
if ("*".equals(sql) || sql.length()>=27) {
continue;
}
statementSet.addInsertSql(sql);
}
return statementSet.execute();
}
}
maven 依赖8
8
1.12.2
2.11
org.apache.flink
flink-clients_2.11
${flink.version}
org.apache.flink
flink-json
${flink.version}
org.apache.flink
flink-table-common
${flink.version}
org.apache.flink
flink-table-planner-blink_${scala.version}
${flink.version}
org.apache.flink
flink-streaming-java_2.11
${flink.version}
org.apache.flink
flink-connector-kafka_${scala.version}
${flink.version}
ru.yandex.clickhouse
clickhouse-jdbc
0.2
commons-lang
commons-lang
2.6
com.google.code.gson
gson
2.2.4
文章比如说857Hub
推荐阅读:
大图表联合开发之Flink SQL建设实时数仓实践
Flink,Spark,Storm,Hadoop构建相对
大图表联合开发之Spark和Flink的对比(刊出)
。西安看白癜风哪家最好南京儿科医院排行
西安看牛皮癣去哪家医院最好
骨质增生
私处整形
全民健康网药品库
阴部潮湿
急支糖浆与甘草口服液哪个好
-
NVIDIA新驱动解锁GPU隐蔽技能:这下轻松了
NVIDIA新驱动追加GPU隐蔽技能:这下轻松了 早在2016年的时候,NVIDIA就同月正在研究“迅速逻辑GPU”Fast Logic Controller的替代改良版,一
-
《彩虹六号异种》8元游玩及串流教程/XGP串流绑定uplay育碧账号游戏入库教程
一、《沙漏六号异形》XGP玩耍一个年底 《沙漏六号异形》已经在XGP和uplay上线了,现有XGP首年底港区只能需8yuan,可以通过加入XGP,方可畅玩《沙漏六号异形》。喜欢的小伙伴可以简介下面的
- 09-19牛逼!第一视角看比赛,篮网推出全美首个3D录播
- 09-19OPPOFindX5Pro,新一代的影像王后
- 09-19早资道 | 集度公布其GS汽车机器人概念车外观;佳能进军造车市场
- 09-19【素描后期必学】曲线调色新玩法!
- 09-19月球背面,嫦娥四号见到了2200万亿吨金属,这些金属来自哪里?
- 09-19无需独显,5000元就能裕玩游戏,良心装机推荐
- 09-19如果太阳系上的冰川全部融化,人类能活多久,太阳系会怎么样?
- 09-19游玩了这么多年摄影,还搞不清蒙版是什么你能忍?
- 09-19iPhone适合记录侦查目标的备忘录便签待办软件推荐
- 09-19阿特斯副总裁张光春受聘国家光伏装备工程技术研究机构新一届技术委员会主任