hadoop examples

main
Marcus Kessel 2024-04-03 10:20:19 +02:00
parent 0c71af8b1c
commit db7de9cc8f
20 changed files with 141982 additions and 0 deletions

36
.gitignore vendored 100644
View File

@ -0,0 +1,36 @@
ignite/
activemq-data/
.idea/
target/
.classpath
.project
.classpath
.settings/
*.aux
*.glo
*.idx
*.log
*.toc
*.ist
*.acn
*.acr
*.alg
*.bbl
*.blg
*.dvi
*.glg
*.gls
*.ilg
*.ind
*.lof
*.lot
*.maf
*.mtc
*.mtc1
*.out
*.synctex.gz
*.scg
#IntelliJ
*.iml
/.idea/

View File

@ -0,0 +1,78 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>de.hsma.bdea</groupId>
<artifactId>Hadoop</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<hadoop.version>3.4.0</hadoop.version>
<slf4j.version>1.7.36</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<release>7</release>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<archive>
<manifest>
<mainClass>
de.hsma.bdea.CardCount
</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1 @@
/*output*

View File

@ -0,0 +1,7 @@
1,4,70
2,21,63
3,2,82
4,11,65
5,14,61
6,2,74
7,6,84

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,15 @@
M,0,0,3
M,0,1,2
M,0,2,1
M,1,0,5
M,1,1,4
M,1,2,3
M,2,0,7
M,2,1,8
M,2,2,9
V,0,1
V,1,2
V,2,3

View File

@ -0,0 +1,12 @@
2012, 01, 01, 5
2012, 01, 02, 45
2012, 01, 03, 35
2012, 01, 04, 10
2001, 11, 01, 46
2001, 11, 02, 47
2001, 11, 03, 48
2001, 11, 04, 40
2005, 08, 20, 50
2005, 08, 21, 52
2005, 08, 22, 38
2005, 08, 23, 70

View File

@ -0,0 +1,25 @@
package de.hsma.bdea;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
/**
* Karten Generator für "karten.csv".
*
*/
public class BigDataKartenGenerator {
public static void main(String[] args) throws FileNotFoundException {
String[] farben = {"Kreuz", "Pik", "Herz", "Karo"};
String[] werte = {"2", "3", "4", "5", "6", "7", "8", "9", "10", "Bube", "Dame", "König", "As"};
System.setOut(new PrintStream(new FileOutputStream("resources/karten2.csv")));
for (int i = 0; i < 123294; i++) {
System.out.println(i + ", " + farben[(int)(Math.random() * 4)] + ", " + werte[(int)(Math.random() * 13)]);
}
}
}

View File

@ -0,0 +1,62 @@
package de.hsma.bdea;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
public class CardCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text color = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] spalten = value.toString().split(",");
color.set(spalten[1]);
context.write(color, one); // return
}
}
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
BasicConfigurator.configure(); // Log4j Config oder ConfigFile in Resources Folder
System.setProperty("hadoop.home.dir", "/"); // zwingend für Hadoop 3.3.0
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "card count");
job.setJarByClass(CardCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class); // reduce lokal bei den Mappern
job.setReducerClass(IntSumReducer.class); // reduce nach Verteilung bei den Reducern
job.setNumReduceTasks(1);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("resources/karten.csv"));
FileOutputFormat.setOutputPath(job, new Path("resources/karten-output" + System.currentTimeMillis()));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

View File

@ -0,0 +1,28 @@
package de.hsma.bdea;
import java.util.Arrays;
public class JavaMVM {
public static void main(String[] args) {
double[] vector = {1, 2, 3};
double[][] matrix = {
{3, 2, 1},
{5, 4, 3},
{7, 8, 9}
};
double[] result = new double[vector.length];
for (int i = 0; i < matrix.length; i++) {
double sum = 0;
for (int j = 0; j < matrix.length; j++) {
sum += matrix[i][j] * vector[j];
}
result[i] = sum;
}
System.out.println(Arrays.toString(result));
}
}

View File

@ -0,0 +1,237 @@
package de.hsma.bdea;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.log4j.BasicConfigurator;
// Formel und Bsp. von http://www.crashkurs-statistik.de/der-korrelationskoeffizient-nach-pearson
public class Korrelation {
public static void main(String[] args) throws Exception {
BasicConfigurator.configure(); // Log4j Config oder ConfigFile in Resources Folder
System.setProperty("hadoop.home.dir", "/"); // für Hadoop 3.3.0
args = new String[] {"resources/corr.txt", "/tmp/output-avg/", "/tmp/output-sq/", "/tmp/output-sumRes/", "/tmp/output-corr/"};
// Job 1
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "average");
job.setNumReduceTasks(2);
job.setJarByClass(Korrelation.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(AverageReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath, true);
job.waitForCompletion(true);
// -----------------------
// Job 2
conf = new Configuration();
job = Job.getInstance(conf, "squaring");
job.setNumReduceTasks(2);
job.setJarByClass(Korrelation.class);
job.setMapperClass(SquareMapper.class);
job.setReducerClass(Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
job.setOutputFormatClass(SequenceFileOutputFormat.class);
outputPath = new Path(args[2]);
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath, true);
job.waitForCompletion(true);
// -----------------------
// Job 3
conf = new Configuration();
job = Job.getInstance(conf, "sumResults");
job.setNumReduceTasks(2);
job.setJarByClass(Korrelation.class);
MultipleInputs.addInputPath(job, new Path(args[1]), SequenceFileInputFormat.class, Mapper.class);
MultipleInputs.addInputPath(job, new Path(args[2]), SequenceFileInputFormat.class, Mapper.class);
job.setReducerClass(DoubleSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
outputPath = new Path(args[3]);
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath, true);
job.waitForCompletion(true);
// -----------------------
// Job 4
conf = new Configuration();
job = Job.getInstance(conf, "finalize");
job.setNumReduceTasks(1);
job.setJarByClass(Korrelation.class);
job.setMapperClass(Mapper.class);
job.setReducerClass(FinalReducer.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
FileInputFormat.addInputPath(job, new Path(args[3]));
outputPath = new Path(args[4]);
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath, true);
job.waitForCompletion(true);
System.exit(0);
}
// -------------------------------
// Job 1
public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
private final static DoubleWritable dw = new DoubleWritable();
private Text[] elem = {new Text("xvg"), new Text("yvg")};
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] spalten = value.toString().split(",");
for (int i = 1; i < 3; i++) {
dw.set(Double.parseDouble(spalten[i]));
context.write(elem[i - 1], dw);
}
}
}
public static class AverageReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
private DoubleWritable result = new DoubleWritable();
public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
double sum = 0;
long cnt = 0;
for (DoubleWritable val : values) {
sum += val.get();
cnt++;
}
result.set(sum/cnt);
context.write(key, result);
}
}
// ---------
// Job 2
public static class SquareMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
private final static DoubleWritable dw = new DoubleWritable();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
double xsq = 0;
double ysq = 0;
double sum = 0;
int cnt = 1;
String[] spalten = value.toString().split(",");
double x = Double.parseDouble(spalten[1]);
double y = Double.parseDouble(spalten[2]);
xsq += x*x;
ysq += y*y;
sum += x*y;
dw.set(xsq);
context.write(new Text("xsq"), dw);
dw.set(ysq);
context.write(new Text("ysq"), dw);
dw.set(sum);
context.write(new Text("sum"), dw);
dw.set(cnt);
context.write(new Text("cnt"), dw);
}
}
// ---------
// Job 3
public static class DoubleSumReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
private DoubleWritable result = new DoubleWritable();
public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
double sum = 0;
for (DoubleWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
// --------
// Job 4
public static class FinalReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
private DoubleWritable result = new DoubleWritable();
private Map<String, Double> map = new HashMap<>();
public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
for (DoubleWritable val : values) {
map.put(key.toString(), val.get());
}
}
public void cleanup(Context context) throws IOException, InterruptedException {
long cnt = (long) (double) map.get("cnt");
double r = (map.get("sum") - (cnt * map.get("xvg") * map.get("yvg"))) /
(Math.sqrt(map.get("xsq") - cnt * map.get("xvg") * map.get("xvg"))
* Math.sqrt(map.get("ysq") - cnt * map.get("yvg") * map.get("yvg")));
result.set(r);
context.write(new Text("correlation"), result);
}
}
}

View File

@ -0,0 +1,158 @@
package de.hsma.bdea;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.log4j.BasicConfigurator;
//10.0
//22.0
//50.0
public class MVM2 {
static int length;
public static void main(String[] args) throws Exception {
BasicConfigurator.configure(); // Log4j Config oder ConfigFile in Resources Folder
System.setProperty("hadoop.home.dir", "/"); // für Hadoop 3.3.0
args = new String[] {"resources/mvm.txt", "resources/mvm-output-1" + System.currentTimeMillis(), "resources/mvm-output-2" + System.currentTimeMillis()};
Configuration conf = new Configuration();
// conf.set("length", "3");
length = 3; // Anzahl Spalten in Matrix (== Zeilen Vector) (ggf. mit einem vorherigen Job ermitteln)
Job job = Job.getInstance(conf, "mvm1");
job.setJarByClass(MVM2.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(MVMMultiplier.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath, true);
job.waitForCompletion(true);
// Job 2
conf = new Configuration();
// conf.set("length", "3");
job = Job.getInstance(conf, "mvm2");
job.setJarByClass(MVM2.class);
job.setMapperClass(Mapper.class);
job.setReducerClass(MVMReducer.class);
job.setNumReduceTasks(2);
job.setPartitionerClass(VectorPartitioner.class); // zur Sortierung nach Vektor Koordinate (optional)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, outputPath);
job.setInputFormatClass(SequenceFileInputFormat.class);
outputPath = new Path(args[2]);
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath, true);
job.waitForCompletion(true);
System.exit(0);
}
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
if ("".equals(line)) {
return;
}
// System.out.println("+++ " + context.getTaskAttemptID().getTaskID().getId() + " - " + value.toString());
// int length = Integer.parseInt(context.getConfiguration().get("length"));
String k = null;
String v = null;
StringTokenizer st = new StringTokenizer(line, ",");
st.nextToken(); // ignore M or V
if (line.startsWith("M")) {
k = st.nextToken() + "," + st.nextToken();
v = st.nextToken();
context.write(new Text(k), new IntWritable(Integer.parseInt(v)));
// System.out.println(k + " " + v);
} else {
k = st.nextToken();
v = st.nextToken();
for (int i = 0; i < length; i++) {
// System.out.println(i+"," + k + " " + v);
context.write(new Text(i + "," + k), new IntWritable(Integer.parseInt(v)));
}
}
}
}
public static class MVMMultiplier extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
private Text newKey = new Text();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// System.out.println("mmmmkkkkkk " + key.toString());
Iterator<IntWritable> it = values.iterator();
result.set(it.next().get() * it.next().get());
String s = key.toString();
newKey.set(s.substring(0, s.indexOf(",")));
// System.out.println("NK: " + newKey.toString() + " -> " + result.get());
context.write(newKey, result);
}
}
public static class MVMReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// System.out.println("kkkkkkk " + key);
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
// System.out.println("vvvvvv " + val.get());
}
result.set(sum);
context.write(key, result);
}
}
public static class VectorPartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numPartitions) {
double intervalSize = Math.ceil((double)length / numPartitions);
return Integer.parseInt(key.toString()) / (int)intervalSize;
}
}
}

View File

@ -0,0 +1,100 @@
package de.hsma.bdea;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
public class SecondarySorting {
public static void main(String[] args) throws Exception {
BasicConfigurator.configure(); // Log4j Config oder ConfigFile in Resources Folder
System.setProperty("hadoop.home.dir", "/"); // für Hadoop 3.3.0
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "secosort");
job.setJarByClass(SecondarySorting.class);
job.setMapperClass(TokenizerMapper.class);
job.setNumReduceTasks(2);
job.setPartitionerClass(MyPartitioner.class);
job.setGroupingComparatorClass(MyGroupingComparator.class);
job.setReducerClass(TempReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("resources/secosort.txt"));
Path outputPath = new Path("resources/secosort-output-" + System.currentTimeMillis());
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath, true);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer st = new StringTokenizer(value.toString(), ", ");
String yearMonth = st.nextToken() + "-" + st.nextToken();
st.nextToken(); // ignore day
String temp = st.nextToken();
System.out.println(new Text(yearMonth + ":" + "000".substring(0, 3-temp.length()) + temp).toString() + " -> " + value);
context.write(new Text(yearMonth + ":" + "000".substring(0, 3-temp.length()) + temp), value);
}
}
public static class MyPartitioner extends Partitioner<Text, Text> {
@Override
public int getPartition(Text key, Text text, int numberOfPartitions) {
return Math.abs(key.toString().substring(0,7).hashCode() % numberOfPartitions);
}
}
public static class MyGroupingComparator extends WritableComparator {
public MyGroupingComparator() {
super(Text.class, true);
}
public int compare(WritableComparable wc1, WritableComparable wc2) {
String s1 = ((Text)wc1).toString();
String s2 = ((Text)wc2).toString();
return s1.substring(0,7).compareTo(s2.substring(0, 7));
}
}
public static class TempReducer extends Reducer<Text, Text, Text, Text> {
private Text result = new Text();
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for (Text val : values) {
String s = val.toString();
System.out.println(s);
sb.append(s.substring(s.lastIndexOf(",") + 1, s.length()).trim() + " ");
}
result.set(sb.toString());
context.write(new Text(key.toString().substring(0, 7)), result);
}
}
}

View File

@ -0,0 +1,99 @@
package de.hsma.bdea;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.log4j.BasicConfigurator;
public class WordCountVL {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] woerter = value.toString().split("\\W+");
for (String wort: woerter) {
word.set(wort);
context.write(word, one);
}
}
}
public static class IdentityMapper extends Mapper<Text, IntWritable, Text, IntWritable> {
public void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static class DoNothingReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, values.iterator().next());
}
}
public static void main(String[] args) throws Exception {
BasicConfigurator.configure(); // Log4j Config oder ConfigFile in Resources Folder
System.setProperty("hadoop.home.dir", "/"); // zwingend für Hadoop 3.3.0
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCountVL.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class); // reduce lokal bei den Mappern
job.setReducerClass(IntSumReducer.class); // reduce nach Verteilung bei den Reducern
job.setNumReduceTasks(4);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("resources/klassiker"));
String output1 = "resources/wordcount-output1-" + System.currentTimeMillis();
FileOutputFormat.setOutputPath(job, new Path(output1));
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.waitForCompletion(true);
// --- Ende Job 1 ---
job = Job.getInstance(conf, "word count sort");
job.setJarByClass(WordCountVL.class);
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(DoNothingReducer.class);
job.setNumReduceTasks(1);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(output1+"/part-r-*"));
job.setInputFormatClass(SequenceFileInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path("resources/wordcount-output2-" + System.currentTimeMillis()));
job.waitForCompletion(true);
}
}

View File

@ -0,0 +1,99 @@
package de.hsma.bdea;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.log4j.BasicConfigurator;
import java.io.IOException;
public class WordCountVLCounter {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] woerter = value.toString().split("\\W+");
for (String wort: woerter) {
word.set(wort);
context.write(word, one);
}
}
}
public static class IntSumCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
// wir könnten auch von IntSumCombiner erben um funktionale Klone zu reduzieren
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
// sum up words
context.getCounter("mygroup", "words").increment(1);
}
}
public static void main(String[] args) throws Exception {
BasicConfigurator.configure(); // Log4j Config oder ConfigFile in Resources Folder
System.setProperty("hadoop.home.dir", "/"); // zwingend für Hadoop 3.3.0
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCountVLCounter.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumCombiner.class); // reduce lokal bei den Mappern
job.setReducerClass(IntSumReducer.class); // reduce nach Verteilung bei den Reducern
job.setNumReduceTasks(4);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("resources/klassiker"));
String output1 = "resources/wordcount-output1-" + System.currentTimeMillis();
FileOutputFormat.setOutputPath(job, new Path(output1));
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.waitForCompletion(true);
// -- Hadoop Counter
// default counter (siehe TaskCounter.* für andere)
long outputRecords = job.getCounters().findCounter(TaskCounter.REDUCE_OUTPUT_RECORDS).getValue();
//job.getCounters().findCounter(TaskCounter.).setValue();
// custom counter (Wichtig: nur in Reducer und nicht in Combiner
long words = job.getCounters().findCounter("mygroup", "words").getValue();
// Anzahl ist dieselbe
System.out.println("#Records => " + outputRecords);
System.out.println("#Words => " + words);
}
}

8
Hadoop/README.md 100644
View File

@ -0,0 +1,8 @@
# HadoopSS24
Basierend auf Materialien von Prof. Hummel (siehe https://git.informatik.hs-mannheim.de/o.hummel/HadoopSS22/).
Enthält
* CardCount
* WordCount
* WordCount + Counter