当前位置:首页 >> 教育
教育

大数据合作开发之Flink sql 的基础用法

2025-09-19 12:18

NOT NULL

java.lang.Double DOUBLE

double DOUBLE NOT NULL

java.sql.Date DATE

java.time.LocalDate DATE

java.sql.Time TIME(0)

java.time.LocalTime TIME(9)

java.sql.Timestamp TIMESTAMP(9)

java.time.LocalDateTime TIMESTAMP(9)

java.time.OffsetDateTime TIMESTAMP(9) WITH TIME ZONE

java.time.Instant TIMESTAMP(9) WITH LOCAL TIME ZONE

java.time.Duration INVERVAL SECOND(9)

java.time.Period I NTERVAL YEAR(4) TO MONTH

byte[] BYTES

T[] ARRAY

java.util.Map MAP

系统设计formula_ Brown 图标formula_

/*

上头是1.12新版本的系统设计内置的formula_,说明我们可以到其网站查看,根据需求可用无需

*/

// TODO 主要介绍图标formula_

/*

udf 和 udaf 必需假设eval分析方法,借助自己的逻辑,说明系统设计时会调用对应的分析方法

udf : 广泛传播一个数值/多个/或者不广泛传播,回到一个新的数值,可以重载该分析方法,【关注尚硅谷,轻松研习IT】说明时会根据广泛传播的表达式调用对应eval烦恼歌发 相同MLT-mapMLT-算子,作用于sql

udaf : 图标聚合formula_,根据自己的逻辑假设累加器

udtf : 用作与表中所,可回到一个或多个数值,

*/

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.api.java.typeutils.RowTypeInfo;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.java.StreamTableEnvironment;

import org.apache.flink.table.functions.AggregateFunction;

import org.apache.flink.table.functions.ScalarFunction;

import org.apache.flink.table.functions.TableFunction;

import org.apache.flink.types.Row;

import java.sql.SQLException;

public class UDFDemo {

public static void main(String[] args) {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,

EnvironmentSettings.newInstance().build());

// 注册formula_

tEnv.registerFunction("customFunc1", new CustomUDF());

tEnv.registerFunction("customFunc2", new CustomUDAF());

tEnv.registerFunction("customFunc3", new CustomUDTF());

}

static class Acc {

int result;

public Integer gerResult() {

return result;

}

public Acc merge(Acc acc) {

result = acc.gerResult() + result;

return this;

}

public void incr() {

result++;

}

}

static class CustomUDF extends ScalarFunction {

// UDF 必需假设该分析方法

public int eval(String str) {

int hc = 0;

for (char c : str.toUpperCase().toCharArray()) {

hc = hashCode()>> c;

}

hc = hc - 1 - str.length();

hc = hc>> 7;

return hc;

}

}

static class CustomUDTF extends TableFunction {

// udtf 必需假设该分析方法,在该分析方法借助逻辑

public void eval(String str) throws SQLException {

if (str != null) {

for (String s : str.split(",")) {

Row row = new Row(2);

row.setField(0, s);

row.setField(1, 1);

collect(row);

}

}

}

@Override

public TypeInformation getResultType() {

return new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);

}

}

static class CustomUDAF extends AggregateFunction {

@Override

public Integer getValue(Acc accumulator) {

return accumulator.gerResult();

}

@Override

public Acc createAccumulator() {

return new Acc();

}

// 累加

public void accumulate(Acc acc,String input){

if("*".equals(input)){

return;

}

acc.incr();

}

public void accumulate(Acc acc){

acc.incr();

}

}

}

最简单个案 代码

flink sql 中所间隔时间机制本质与 dataStream api 多种不同,似乎可用少于差异,稍加请注意无需,请注意以外 watermark 必需可用 sql 中所 timestamp(3)子类(说明对应 java 子类可根据前面子类必需判断),增设 watermark 后可可用 ROWTIEM 报文(说明看 sql 代码),没有增设可直接可用 PROCTIME 报文

请注意 : 多种不同的间隔时间语义要严格对应生态配置的间隔时间语义,否则确实出现异常

间隔时间报文为两种,不属于非软件以外报文,增设完间隔时间语义后,根据需求可用说明的间隔时间报文

ROWTIME : 事件间隔时间

PROCTIME : 处理间隔时间报文

情节 :

join : 情节与广元 join 或者 维表 join,现今 flink 拥护的不是很好

topN Brown 去重 : 语法基本多种不同,row_num> 1 即 topN , 当=1 则是去重操作

topN 情节一些热搜,排名等内容可

去重顾名思义,就是为了去重,去重时会就其到 retract 流(以后时会详细资料讲)内容可,时会格外新以后已经存在的结果

// TODO 上头代码仅供参考,说明测试根据自己间隔时间生态来

// 以下只是一些最简单的个案,右方时会逐步深入复杂sql和基本概念层面

import org.apache.flink.configuration.PipelineOptions;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.StatementSet;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.TableResult;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**

* @author 857hub

*/

public class ClickhouseSinkApp {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(

env,

EnvironmentSettings.newInstance().

// useBlinkPlanner().

build()

);

tEnv.getConfig().getConfiguration().setString(PipelineOptions.NAME, "sql test");

// sources

String source = "CREATE TABLE source (" +

" MLT-idMLT- int," +

" MLT-nameMLT- varchar." +

" MLT-tsMLT- timestamp(3)," +

// 以外watermark 允许延迟5s

"WATERMARK FOR ts AS ts - INTERVAL '5' SECOND"+

") WITH (" +

" 'connector' = 'kafka'," +

" 'topic' = 'test1'," +

" 'properties.bootstrap.servers' = '172.16.100.109:9092'," +

" 'properties.group.id' = 'xzw'," +

" 'scan.startup.mode' = 'latest-offset'," +

" 'format' = 'json'" +

")";

String source2 = "CREATE TABLE source2 (" +

" MLT-idMLT- int," +

" MLT-nameMLT- varchar," +

" MLT-tsMLT- timestamp(3)" +

") WITH (" +

" 'connector' = 'kafka'," +

" 'topic' = 'test2'," +

" 'properties.bootstrap.servers' = '172.16.100.109:9092'," +

" 'properties.group.id' = 'xzw'," +

" 'scan.startup.mode' = 'latest-offset'," +

" 'format' = 'json'" +

")";

// clickhouse sink 由我自己假设,右方时会对sql图标source和sink完成解说

String sink = "CREATE TABLE sink (" +

" MLT-idMLT- INT," +

" MLT-nameMLT- VARCHAR" +

") WITH (" +

// 必需图标接图表表达式 便是 option

" 'connector' = 'xzw_ck'," +

" 'url' = 'jdbc:clickhouse://localhost:8123/default'," +

" 'table-name' = 'test'," +

" 'username' = 'default'," +

" 'password' = '123456'" +

" )";

// 指派 source sink sql

tEnv.executeSql(source);

tEnv.executeSql(source2);

tEnv.executeSql(sink);

/*

由于是最简单可用,没有在情节应用领域,最简单介绍一下差异,可以根据们多种不同的差异在自己项目中所可用

left json : 无论是不是join上都回到左表的图表

inner join : 只有join上才时会回到匹配后的结果

full outer join : 两边的图表都时会回到,无论是不是join上,没有的则为null

interval join : 基于间隔时间各地区的join,在以外的间隔时间各地区回到join上的图表

*/

String joinSql = "select * from source1 s1" +

"left join source2 s2" +

// 内连接

// "inner join source2" || "join source2"

// 全部都是连接

// "full outer join source2"

// 间隔时间范围内join

// "s1.ts>= s2.ts AND s1.ts

" on s1.id =s2.id "

;

Table joinTable = tEnv.sqlQuery(joinSql);

// 分组排序,取topN, 如果要是去重 rnum=1无需借助去重操作

String insertSql = "insert into sink select id,name from(" +

"select *," +

"row_number() over(partition by id order by ts) as rnum " +

"from "+joinTable+" where rnum

")";

// add insert sql

TableResult tableResult = executeSql(tEnv, "insert into sink select * from source", "*",insertSql);

// 随心所欲可用

// Optional jobClient = tableResult.getJobClient();

}

// 加到多个sql说明指派

private static TableResult executeSql(StreamTableEnvironment tEnv, final String... sqls) {

StatementSet statementSet = tEnv.createStatementSet();

for (String sql : sqls) {

if ("*".equals(sql) || sql.length()>=27) {

continue;

}

statementSet.addInsertSql(sql);

}

return statementSet.execute();

}

}

maven 依赖

8

8

1.12.2

2.11

org.apache.flink

flink-clients_2.11

${flink.version}

org.apache.flink

flink-json

${flink.version}

org.apache.flink

flink-table-common

${flink.version}

org.apache.flink

flink-table-planner-blink_${scala.version}

${flink.version}

org.apache.flink

flink-streaming-java_2.11

${flink.version}

org.apache.flink

flink-connector-kafka_${scala.version}

${flink.version}

ru.yandex.clickhouse

clickhouse-jdbc

0.2

commons-lang

commons-lang

2.6

com.google.code.gson

gson

2.2.4

文章比如说857Hub

推荐阅读:

大图表联合开发之Flink SQL建设实时数仓实践

Flink,Spark,Storm,Hadoop构建相对

大图表联合开发之Spark和Flink的对比(刊出)

西安看白癜风哪家最好
南京儿科医院排行
西安看牛皮癣去哪家医院最好
骨质增生
私处整形
全民健康网药品库
阴部潮湿
急支糖浆与甘草口服液哪个好

上一篇: 尼康发布Z fc固件:新增28-75/2.8镜头赞同

下一篇: 几张图了解合成氨工艺流程模型 大型油田工艺流程模型

友情链接