Flink Beispiel

main
SoftwareObservatorium 2023-06-19 15:13:10 +02:00
parent dba3977c7e
commit 61ab1bc73f
10 changed files with 7232 additions and 1 deletions

41
Flink/.gitignore vendored 100644
View File

@ -0,0 +1,41 @@
# ---> Eclipse
*.pydevproject
.metadata
.gradle
bin/
tmp/
*.tmp
*.bak
*.swp
*~.nib
local.properties
.settings/
.loadpath
# Eclipse Core
.project
# External tool builders
.externalToolBuilders/
# Locally stored "Eclipse launch configurations"
*.launch
# CDT-specific
.cproject
# JDT-specific (Eclipse Java Development Tools)
.classpath
# Java annotation processor (APT)
.factorypath
# PDT-specific
.buildpath
# sbteclipse plugin
.target
# TeXlipse plugin
.texlipse

2
Flink/README.md 100644
View File

@ -0,0 +1,2 @@
# Flink

View File

@ -0,0 +1,93 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>de.hsma.informatik.bdea</groupId>
<artifactId>Flink</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>Flink</name>
<properties>
<java.version>11</java.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>11</source> <!-- same as <java.version> -->
<target>11</target> <!-- same as <java.version> -->
</configuration>
</plugin>
</plugins>
</build>
<!-- <build> <pluginManagement> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version> <configuration> <source>1.8</source> <target>1.8</target>
</configuration> <dependencies> <dependency> <groupId>org.eclipse.jdt</groupId>
<artifactId>ecj</artifactId> <version>3.24.0</version> </dependency> </dependencies>
</plugin> </plugins> </pluginManagement> </build> -->
<dependencies>
<!-- <dependency> <groupId>org.eclipse.jdt.core.compiler</groupId> <artifactId>ecj</artifactId>
<version>4.6.1</version> </dependency> -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_2.11</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-cassandra_2.11</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.11</artifactId>
<version>1.5.0</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,24 @@
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
public class AmountsThreshold {
// bei den VM-Argumenten --add-opens=java.base/java.nio=ALL-UNNAMED setzen, wenn das
// Programm mit einer InaccessibleObjectException abstürzt
// und bei Flink 1.15 dann zusätzlich noch: --add-opens=java.base/java.util=ALL-UNNAMED
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> amounts = env.fromElements(1, 29, 40, 50);
int threshold = 30;
amounts
.filter(a -> a > threshold)
.reduce((input, sum) -> input + sum)
//.writeAsXXX anstatt von print in Datei speichern
//.collect() im Programm weiterverwenden (vgl. Stream API)
.print();
}
}

View File

@ -0,0 +1,37 @@
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class BasicDataStream {
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
// Vorher mit nc 127.0.0.1 -kl 9995 den Eingabe-Stream in der Konsole öffnen
// Linux/Mac bringt netcat bereits mit, unter Win ggf. nachinstallieren
DataStream<String> amounts = env.socketTextStream("localhost", 9995);
int threshold = 30;
amounts
.map(a -> Integer.parseInt(a))
.map(i -> new Tuple2<Integer, Integer>(i, i * 2)) // i verdoppeln, wird aber im Bsp. nicht weiter genutzt
.returns(Types.TUPLE(Types.INT, Types.INT))
.keyBy(0)
.countWindowAll(2) // immer paarweise zusammenfassen
.reduce((x, y) -> new Tuple2<>(x.f0, x.f0 + y.f0)) // und die Paare aufsummieren
.returns(Types.TUPLE(Types.INT, Types.INT))
.print();
env.execute();
System.out.println("Ende von main erreicht.");
}
}

View File

@ -0,0 +1,30 @@
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
public class GlobalSort {
// bei den VM-Argumenten --add-opens=java.base/java.nio=ALL-UNNAMED setzen, wenn das
// Programm mit einer InaccessibleObjectException abstürzt
// und bei Flink 1.15 dann zusätzlich noch: --add-opens=java.base/java.util=ALL-UNNAMED
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataSet<Integer> amounts = env.fromElements(17, 4, 42, 41, 20, 3, 1, -1, 0,
-134, 29, 40, 50, 70, 80);
amounts
.map(i -> Tuple1.of(i))
.returns(Types.TUPLE(Types.INT)) // benötigt wegen "type erasure" (vgl. Generics)
.partitionByRange(0)
.sortPartition(0, Order.ASCENDING)
.writeAsText("/tmp/flink");
env.execute();
}
}

View File

@ -0,0 +1,36 @@
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCount {
public static void main(String[] args) throws Exception {
// Vorher mit nc 127.0.0.1 -kl 9995 den Eingabe-Stream in der Konsole öffnen
// Linux/Mac bringt netcat bereits mit, unter Win ggf. nachinstallieren
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9995)
// .readTextFile("file:///tmp/faust.txt")
.flatMap((String sentence, Collector<Tuple2<String, Integer>> out) -> {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}}) .returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(0)
.timeWindow(Time.seconds(21))
// .countWindow(5, 3)
.sum(1);
dataStream.print();
env.execute("Window WordCount");
}
}

View File

@ -0,0 +1,28 @@
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class WordCountExample {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<String, Integer>> wordCounts = env
.readTextFile("file:///home/marcus/development/repositories/Hochschule_BDEA/BDEA_SS23/Flink/flink/src/main/resources/Faust.txt")
.flatMap(new LineSplitter())
.groupBy(0)
.sum(1);
wordCounts.print();
}
public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
for (String word : line.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -18,7 +18,7 @@ public class QueryApp {
try (CqlSession session = CqlSession.builder()
.addContactPoint(new InetSocketAddress("localhost", 9042)) // = CqlSession.builder().build()
.withLocalDatacenter("datacenter1") // benötigt wenn contact point explizit angegeben
.withKeyspace("test") // unser keyspace (z.B. use test;)
.withKeyspace("test3") // unser keyspace (z.B. use test;)
.build()) {
ResultSet rs = session.execute("select * from professoren");
for (Row row : rs) {