Hadoop Vertiefung

main
SoftwareObservatorium 2023-04-04 15:09:47 +02:00
parent 7fa09c0377
commit 6e580698fd
6 changed files with 529 additions and 0 deletions

View File

@ -0,0 +1,7 @@
1,4,70
2,21,63
3,2,82
4,11,65
5,14,61
6,2,74
7,6,84

View File

@ -0,0 +1,15 @@
M,0,0,3
M,0,1,2
M,0,2,1
M,1,0,5
M,1,1,4
M,1,2,3
M,2,0,7
M,2,1,8
M,2,2,9
V,0,1
V,1,2
V,2,3

View File

@ -0,0 +1,12 @@
2012, 01, 01, 5
2012, 01, 02, 45
2012, 01, 03, 35
2012, 01, 04, 10
2001, 11, 01, 46
2001, 11, 02, 47
2001, 11, 03, 48
2001, 11, 04, 40
2005, 08, 20, 50
2005, 08, 21, 52
2005, 08, 22, 38
2005, 08, 23, 70

View File

@ -0,0 +1,237 @@
package de.hsma.bdea;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.log4j.BasicConfigurator;
// Formel und Bsp. von http://www.crashkurs-statistik.de/der-korrelationskoeffizient-nach-pearson
public class Korrelation {
public static void main(String[] args) throws Exception {
BasicConfigurator.configure(); // Log4j Config oder ConfigFile in Resources Folder
System.setProperty("hadoop.home.dir", "/"); // für Hadoop 3.3.0
args = new String[] {"resources/corr.txt", "/tmp/output-avg/", "/tmp/output-sq/", "/tmp/output-sumRes/", "/tmp/output-corr/"};
// Job 1
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "average");
job.setNumReduceTasks(2);
job.setJarByClass(Korrelation.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(AverageReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath, true);
job.waitForCompletion(true);
// -----------------------
// Job 2
conf = new Configuration();
job = Job.getInstance(conf, "squaring");
job.setNumReduceTasks(2);
job.setJarByClass(Korrelation.class);
job.setMapperClass(SquareMapper.class);
job.setReducerClass(Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
job.setOutputFormatClass(SequenceFileOutputFormat.class);
outputPath = new Path(args[2]);
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath, true);
job.waitForCompletion(true);
// -----------------------
// Job 3
conf = new Configuration();
job = Job.getInstance(conf, "sumResults");
job.setNumReduceTasks(2);
job.setJarByClass(Korrelation.class);
MultipleInputs.addInputPath(job, new Path(args[1]), SequenceFileInputFormat.class, Mapper.class);
MultipleInputs.addInputPath(job, new Path(args[2]), SequenceFileInputFormat.class, Mapper.class);
job.setReducerClass(DoubleSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
outputPath = new Path(args[3]);
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath, true);
job.waitForCompletion(true);
// -----------------------
// Job 4
conf = new Configuration();
job = Job.getInstance(conf, "finalize");
job.setNumReduceTasks(1);
job.setJarByClass(Korrelation.class);
job.setMapperClass(Mapper.class);
job.setReducerClass(FinalReducer.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
FileInputFormat.addInputPath(job, new Path(args[3]));
outputPath = new Path(args[4]);
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath, true);
job.waitForCompletion(true);
System.exit(0);
}
// -------------------------------
// Job 1
public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
private final static DoubleWritable dw = new DoubleWritable();
private Text[] elem = {new Text("xvg"), new Text("yvg")};
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] spalten = value.toString().split(",");
for (int i = 1; i < 3; i++) {
dw.set(Double.parseDouble(spalten[i]));
context.write(elem[i - 1], dw);
}
}
}
public static class AverageReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
private DoubleWritable result = new DoubleWritable();
public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
double sum = 0;
long cnt = 0;
for (DoubleWritable val : values) {
sum += val.get();
cnt++;
}
result.set(sum/cnt);
context.write(key, result);
}
}
// ---------
// Job 2
public static class SquareMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
private final static DoubleWritable dw = new DoubleWritable();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
double xsq = 0;
double ysq = 0;
double sum = 0;
int cnt = 1;
String[] spalten = value.toString().split(",");
double x = Double.parseDouble(spalten[1]);
double y = Double.parseDouble(spalten[2]);
xsq += x*x;
ysq += y*y;
sum += x*y;
dw.set(xsq);
context.write(new Text("xsq"), dw);
dw.set(ysq);
context.write(new Text("ysq"), dw);
dw.set(sum);
context.write(new Text("sum"), dw);
dw.set(cnt);
context.write(new Text("cnt"), dw);
}
}
// ---------
// Job 3
public static class DoubleSumReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
private DoubleWritable result = new DoubleWritable();
public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
double sum = 0;
for (DoubleWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
// --------
// Job 4
public static class FinalReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
private DoubleWritable result = new DoubleWritable();
private Map<String, Double> map = new HashMap<>();
public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
for (DoubleWritable val : values) {
map.put(key.toString(), val.get());
}
}
public void cleanup(Context context) throws IOException, InterruptedException {
long cnt = (long) (double) map.get("cnt");
double r = (map.get("sum") - (cnt * map.get("xvg") * map.get("yvg"))) /
(Math.sqrt(map.get("xsq") - cnt * map.get("xvg") * map.get("xvg"))
* Math.sqrt(map.get("ysq") - cnt * map.get("yvg") * map.get("yvg")));
result.set(r);
context.write(new Text("correlation"), result);
}
}
}

View File

@ -0,0 +1,158 @@
package de.hsma.bdea;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.log4j.BasicConfigurator;
//10.0
//22.0
//50.0
public class MVM2 {
static int length;
public static void main(String[] args) throws Exception {
BasicConfigurator.configure(); // Log4j Config oder ConfigFile in Resources Folder
System.setProperty("hadoop.home.dir", "/"); // für Hadoop 3.3.0
args = new String[] {"resources/mvm.txt", "resources/mvm-output-1" + System.currentTimeMillis(), "resources/mvm-output-2" + System.currentTimeMillis()};
Configuration conf = new Configuration();
// conf.set("length", "3");
length = 3; // ggf. mit einem vorherigen Job ermitteln
Job job = Job.getInstance(conf, "mvm1");
job.setJarByClass(MVM2.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(MVMMultiplier.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath, true);
job.waitForCompletion(true);
// Job 2
conf = new Configuration();
// conf.set("length", "3");
job = Job.getInstance(conf, "mvm2");
job.setJarByClass(MVM2.class);
job.setMapperClass(Mapper.class);
job.setReducerClass(MVMReducer.class);
job.setNumReduceTasks(2);
job.setPartitionerClass(VectorPartitioner.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, outputPath);
job.setInputFormatClass(SequenceFileInputFormat.class);
outputPath = new Path(args[2]);
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath, true);
job.waitForCompletion(true);
System.exit(0);
}
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
if ("".equals(line)) {
return;
}
// System.out.println("+++ " + context.getTaskAttemptID().getTaskID().getId() + " - " + value.toString());
// int length = Integer.parseInt(context.getConfiguration().get("length"));
String k = null;
String v = null;
StringTokenizer st = new StringTokenizer(line, ",");
st.nextToken(); // ignore M or V
if (line.startsWith("M")) {
k = st.nextToken() + "," + st.nextToken();
v = st.nextToken();
context.write(new Text(k), new IntWritable(Integer.parseInt(v)));
// System.out.println(k + " " + v);
} else {
k = st.nextToken();
v = st.nextToken();
for (int i = 0; i < length; i++) {
// System.out.println(i+"," + k + " " + v);
context.write(new Text(i + "," + k), new IntWritable(Integer.parseInt(v)));
}
}
}
}
public static class MVMMultiplier extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
private Text newKey = new Text();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// System.out.println("mmmmkkkkkk " + key.toString());
Iterator<IntWritable> it = values.iterator();
result.set(it.next().get() * it.next().get());
String s = key.toString();
newKey.set(s.substring(0, s.indexOf(",")));
// System.out.println("NK: " + newKey.toString() + " -> " + result.get());
context.write(newKey, result);
}
}
public static class MVMReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// System.out.println("kkkkkkk " + key);
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
// System.out.println("vvvvvv " + val.get());
}
result.set(sum);
context.write(key, result);
}
}
public static class VectorPartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numPartitions) {
double intervalSize = Math.ceil((double)length / numPartitions);
return Integer.parseInt(key.toString()) / (int)intervalSize;
}
}
}

View File

@ -0,0 +1,100 @@
package de.hsma.bdea;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
public class SecondarySorting {
public static void main(String[] args) throws Exception {
BasicConfigurator.configure(); // Log4j Config oder ConfigFile in Resources Folder
System.setProperty("hadoop.home.dir", "/"); // für Hadoop 3.3.0
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "secosort");
job.setJarByClass(SecondarySorting.class);
job.setMapperClass(TokenizerMapper.class);
job.setNumReduceTasks(2);
job.setPartitionerClass(MyPartitioner.class);
job.setGroupingComparatorClass(MyGroupingComparator.class);
job.setReducerClass(TempReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("resources/secosort.txt"));
Path outputPath = new Path("resources/secosort-output-" + System.currentTimeMillis());
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath, true);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer st = new StringTokenizer(value.toString(), ", ");
String yearMonth = st.nextToken() + "-" + st.nextToken();
st.nextToken(); // ignore day
String temp = st.nextToken();
System.out.println(new Text(yearMonth + ":" + "000".substring(0, 3-temp.length()) + temp).toString() + " -> " + value);
context.write(new Text(yearMonth + ":" + "000".substring(0, 3-temp.length()) + temp), value);
}
}
public static class MyPartitioner extends Partitioner<Text, Text> {
@Override
public int getPartition(Text key, Text text, int numberOfPartitions) {
return Math.abs(key.toString().substring(0,7).hashCode() % numberOfPartitions);
}
}
public static class MyGroupingComparator extends WritableComparator {
public MyGroupingComparator() {
super(Text.class, true);
}
public int compare(WritableComparable wc1, WritableComparable wc2) {
String s1 = ((Text)wc1).toString();
String s2 = ((Text)wc2).toString();
return s1.substring(0,7).compareTo(s2.substring(0, 7));
}
}
public static class TempReducer extends Reducer<Text, Text, Text, Text> {
private Text result = new Text();
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for (Text val : values) {
String s = val.toString();
System.out.println(s);
sb.append(s.substring(s.lastIndexOf(",") + 1, s.length()).trim() + " ");
}
result.set(sb.toString());
context.write(new Text(key.toString().substring(0, 7)), result);
}
}
}