SingleOutputStreamOperator<Snapshot> operator = singleOutputStreamOperator.keyBy(new KeySelector<Snapshot, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> getKey(Snapshot snapshot) throws Exception {
return Tuple2.of(snapshot.getDateTime(), snapshot.getHsSecurityId());
}).flatMap(new SnapshotProcessFunction());
在写
Flink程序的时候(以最简单的WordCount案例为例),有时会使用
Lambda表达式来简化,如下边程序中的flatMap算子和Map算子处,都是用了
Lambda表达式来简写:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
记个错误:
The generic type parameters of 'Tuple4' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.
这个是因为我在fl...
<groupId>org.apache.
flink</groupId>
<artifactId>
flink-java</artifactId>
<version>1.9.0</version>
</d...
Java 8引入了一些新的语言功能,旨在更快,更清晰地编码。它具有最重要的功能,即所谓的“ Lambda表达式”,为函数式编程打开了大门。Lambda表达式允许以直接方式实现和传递函数,而无需声明其他(匿名)类。
注意: Flink支持对Java API的所有运算符使用lambda表达式,但是,每当lambda表达式使用Java泛型时,都需要显式声明类型信息。
二.范例与限制
下面的示例说明如何实现一个简单的内联map()函数,该函数使用lambda表达式对输入进行平方。函数的输入i和输出参数的类
Java 8引入了一些新的语言特性,旨在更快、更清晰地编码。通过最重要的特性,所谓的“Lambda表达式”,它打开了函数式编程的大门。Lambda表达式允许以一种直接的方式实现和传递函数,而无需声明其他(匿名)类。Flink支持对Java API的所有操作符使用lambda表达式,但是,每当lambda表达式使用Java泛型时,您需要显式声明类型信息。
举个栗子,看下returns如何使用:
本文档展示了如何使用lambda表达式,并描述了当前的限制。关于Flink API的一般介绍,请参阅DataStea
Map: 对于简单的map(i -> i * i) flink 可以猜测其 类型。复杂的则需要指定return type,或者构造一个MapFunction,或者extends 自 Tuple2<Integer, Integer>。
flatMap:对于flatMap 的支持是无法猜测出来 类型的,必须通过returns(Types.STRING) 指定具体的返回值类型。
package myflink.learn.lambda;
* Created by:
* date:
Java 8引入了一些新的语言特性,旨在更快更清晰地编码。Java 8最重要的特性,即所谓的“Lambda表达式”,为函数式编程打开了大门。Lambda表达式允许以一种直接的方式实现和传递函数,而不需要声明额外的(匿名的)类。
但是,当lambda表达式使用Java泛型时,您需要显式声明类型信息,Flink支持对Jav...
楼主在使用Java的lambda表达式,进行flink编程时报错,如下:
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing
Exception in thread "main" org.apa...
Flink Java API不像Scala API可以随便写lambda表达式,写完以后需要使用returns方法显式指定返回值类型,否则会报下面错误,大概意思就是说Java的lambda表达式不能提供足够的类型信息,需要指定返回值类型。不推荐使用lambda表达式而是使用匿名类。
log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.ClosureCleaner).
log4j:WARN Please
Expected type Sequence[Union[float, tuple[float, float]]] , got tuple[tuple, ...] instead解决方法
这个错误信息表明你在一个期望接收类型为`Sequence[Union[float, tuple[float, float]]]`的函数或变量中,错误地提供了一个`tuple[tuple, ...]`类型的对象。简而言之,你提供了一个元组的元组,而不是一个浮点数或者由两个浮点数组成的元组的序列。
为了解决这个问题,你需要确保你提供的数据结构符合函数或变量所期望的类型。下面是一些可能的解决方法:
1. 确认你的数据结构:确保你正试图传递给函数的序列包含正确的元素类型。每个元素要么是一个浮点数,要么是一个由两个浮点数组成的元组。
2. 转换数据类型:如果你有一个元组的元组,而你需要的是一个元素类型为浮点数或浮点数元组的列表或序列,你可能需要将数据结构转换为合适的类型。例如,如果你有一个`tuple[tuple, ...]`类型的数据,你可以通过列表推导式将其转换为所需的类型:
```python
original_data = ... # 你的元组的元组
corrected_data = [element for tuple_pair in original_data for element in tuple_pair if isinstance(element, float)]
3. 检查函数或变量的定义:确保你调用的函数或你正在赋值的变量确实期望接收这个类型。如果它应该是`Sequence[Union[float, tuple[float, float]]]`类型,那么你的数据应该与之匹配。如果不是,你可能需要修改函数或变量的定义以匹配你的数据结构。
4. 检查函数或方法的使用:如果你在使用一个库或框架,确保你遵循了正确的API用法,包括数据结构的正确传递。
请注意,具体的解决方法取决于你的具体代码和上下文。需要查看完整的代码和错误发生的具体位置来提供一个准确的解决方案。