From 6e580698fd5208646135cf5aa9e385d435d24388 Mon Sep 17 00:00:00 2001 From: SoftwareObservatorium Date: Tue, 4 Apr 2023 15:09:47 +0200 Subject: [PATCH] Hadoop Vertiefung --- HadoopSS23/Hadoop/resources/corr.txt | 7 + HadoopSS23/Hadoop/resources/mvm.txt | 15 ++ HadoopSS23/Hadoop/resources/secosort.txt | 12 + .../main/java/de/hsma/bdea/Korrelation.java | 237 ++++++++++++++++++ .../src/main/java/de/hsma/bdea/MVM2.java | 158 ++++++++++++ .../java/de/hsma/bdea/SecondarySorting.java | 100 ++++++++ 6 files changed, 529 insertions(+) create mode 100644 HadoopSS23/Hadoop/resources/corr.txt create mode 100644 HadoopSS23/Hadoop/resources/mvm.txt create mode 100644 HadoopSS23/Hadoop/resources/secosort.txt create mode 100644 HadoopSS23/Hadoop/src/main/java/de/hsma/bdea/Korrelation.java create mode 100644 HadoopSS23/Hadoop/src/main/java/de/hsma/bdea/MVM2.java create mode 100644 HadoopSS23/Hadoop/src/main/java/de/hsma/bdea/SecondarySorting.java diff --git a/HadoopSS23/Hadoop/resources/corr.txt b/HadoopSS23/Hadoop/resources/corr.txt new file mode 100644 index 0000000..2f1ec5b --- /dev/null +++ b/HadoopSS23/Hadoop/resources/corr.txt @@ -0,0 +1,7 @@ +1,4,70 +2,21,63 +3,2,82 +4,11,65 +5,14,61 +6,2,74 +7,6,84 diff --git a/HadoopSS23/Hadoop/resources/mvm.txt b/HadoopSS23/Hadoop/resources/mvm.txt new file mode 100644 index 0000000..f05fdc7 --- /dev/null +++ b/HadoopSS23/Hadoop/resources/mvm.txt @@ -0,0 +1,15 @@ +M,0,0,3 +M,0,1,2 +M,0,2,1 + +M,1,0,5 +M,1,1,4 +M,1,2,3 + +M,2,0,7 +M,2,1,8 +M,2,2,9 + +V,0,1 +V,1,2 +V,2,3 \ No newline at end of file diff --git a/HadoopSS23/Hadoop/resources/secosort.txt b/HadoopSS23/Hadoop/resources/secosort.txt new file mode 100644 index 0000000..b9b3f4f --- /dev/null +++ b/HadoopSS23/Hadoop/resources/secosort.txt @@ -0,0 +1,12 @@ +2012, 01, 01, 5 +2012, 01, 02, 45 +2012, 01, 03, 35 +2012, 01, 04, 10 +2001, 11, 01, 46 +2001, 11, 02, 47 +2001, 11, 03, 48 +2001, 11, 04, 40 +2005, 08, 20, 50 +2005, 08, 21, 52 +2005, 08, 22, 38 +2005, 08, 23, 70 \ No newline at end of file diff --git a/HadoopSS23/Hadoop/src/main/java/de/hsma/bdea/Korrelation.java b/HadoopSS23/Hadoop/src/main/java/de/hsma/bdea/Korrelation.java new file mode 100644 index 0000000..160fdd0 --- /dev/null +++ b/HadoopSS23/Hadoop/src/main/java/de/hsma/bdea/Korrelation.java @@ -0,0 +1,237 @@ +package de.hsma.bdea; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.log4j.BasicConfigurator; + +// Formel und Bsp. von http://www.crashkurs-statistik.de/der-korrelationskoeffizient-nach-pearson + +public class Korrelation { + + public static void main(String[] args) throws Exception { + BasicConfigurator.configure(); // Log4j Config oder ConfigFile in Resources Folder + System.setProperty("hadoop.home.dir", "/"); // für Hadoop 3.3.0 + + args = new String[] {"resources/corr.txt", "/tmp/output-avg/", "/tmp/output-sq/", "/tmp/output-sumRes/", "/tmp/output-corr/"}; + + // Job 1 + Configuration conf = new Configuration(); + Job job = Job.getInstance(conf, "average"); + job.setNumReduceTasks(2); + + job.setJarByClass(Korrelation.class); + job.setMapperClass(TokenizerMapper.class); + job.setReducerClass(AverageReducer.class); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(DoubleWritable.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + + FileInputFormat.addInputPath(job, new Path(args[0])); + + Path outputPath = new Path(args[1]); + FileOutputFormat.setOutputPath(job, outputPath); + outputPath.getFileSystem(conf).delete(outputPath, true); + + job.waitForCompletion(true); + + // ----------------------- + // Job 2 + + conf = new Configuration(); + job = Job.getInstance(conf, "squaring"); + job.setNumReduceTasks(2); + + job.setJarByClass(Korrelation.class); + job.setMapperClass(SquareMapper.class); + job.setReducerClass(Reducer.class); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(DoubleWritable.class); + + FileInputFormat.addInputPath(job, new Path(args[0])); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + + outputPath = new Path(args[2]); + FileOutputFormat.setOutputPath(job, outputPath); + outputPath.getFileSystem(conf).delete(outputPath, true); + + job.waitForCompletion(true); + + // ----------------------- + // Job 3 + + conf = new Configuration(); + job = Job.getInstance(conf, "sumResults"); + job.setNumReduceTasks(2); + + job.setJarByClass(Korrelation.class); + + MultipleInputs.addInputPath(job, new Path(args[1]), SequenceFileInputFormat.class, Mapper.class); + MultipleInputs.addInputPath(job, new Path(args[2]), SequenceFileInputFormat.class, Mapper.class); + + job.setReducerClass(DoubleSumReducer.class); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(DoubleWritable.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + + outputPath = new Path(args[3]); + FileOutputFormat.setOutputPath(job, outputPath); + outputPath.getFileSystem(conf).delete(outputPath, true); + + job.waitForCompletion(true); + + // ----------------------- + // Job 4 + + conf = new Configuration(); + job = Job.getInstance(conf, "finalize"); + job.setNumReduceTasks(1); + + job.setJarByClass(Korrelation.class); + job.setMapperClass(Mapper.class); + job.setReducerClass(FinalReducer.class); + + job.setInputFormatClass(SequenceFileInputFormat.class); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(DoubleWritable.class); + + FileInputFormat.addInputPath(job, new Path(args[3])); + outputPath = new Path(args[4]); + FileOutputFormat.setOutputPath(job, outputPath); + outputPath.getFileSystem(conf).delete(outputPath, true); + + job.waitForCompletion(true); + System.exit(0); + } + + // ------------------------------- + // Job 1 + + public static class TokenizerMapper extends Mapper { + private final static DoubleWritable dw = new DoubleWritable(); + private Text[] elem = {new Text("xvg"), new Text("yvg")}; + + public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + String[] spalten = value.toString().split(","); + + for (int i = 1; i < 3; i++) { + dw.set(Double.parseDouble(spalten[i])); + context.write(elem[i - 1], dw); + } + } + } + + public static class AverageReducer extends Reducer { + private DoubleWritable result = new DoubleWritable(); + + public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { + double sum = 0; + long cnt = 0; + + for (DoubleWritable val : values) { + sum += val.get(); + cnt++; + } + + result.set(sum/cnt); + context.write(key, result); + } + } + + // --------- + // Job 2 + + public static class SquareMapper extends Mapper { + private final static DoubleWritable dw = new DoubleWritable(); + + public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + double xsq = 0; + double ysq = 0; + double sum = 0; + int cnt = 1; + + String[] spalten = value.toString().split(","); + + double x = Double.parseDouble(spalten[1]); + double y = Double.parseDouble(spalten[2]); + + xsq += x*x; + ysq += y*y; + sum += x*y; + + dw.set(xsq); + context.write(new Text("xsq"), dw); + dw.set(ysq); + context.write(new Text("ysq"), dw); + dw.set(sum); + context.write(new Text("sum"), dw); + dw.set(cnt); + context.write(new Text("cnt"), dw); + } + } + + // --------- + // Job 3 + + public static class DoubleSumReducer extends Reducer { + private DoubleWritable result = new DoubleWritable(); + + public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { + double sum = 0; + for (DoubleWritable val : values) { + sum += val.get(); + } + result.set(sum); + context.write(key, result); + } + + } + + // -------- + // Job 4 + + public static class FinalReducer extends Reducer { + private DoubleWritable result = new DoubleWritable(); + private Map map = new HashMap<>(); + + public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { + for (DoubleWritable val : values) { + map.put(key.toString(), val.get()); + } + } + + public void cleanup(Context context) throws IOException, InterruptedException { + long cnt = (long) (double) map.get("cnt"); + + double r = (map.get("sum") - (cnt * map.get("xvg") * map.get("yvg"))) / + (Math.sqrt(map.get("xsq") - cnt * map.get("xvg") * map.get("xvg")) + * Math.sqrt(map.get("ysq") - cnt * map.get("yvg") * map.get("yvg"))); + result.set(r); + + context.write(new Text("correlation"), result); + } + } + +} + + + + diff --git a/HadoopSS23/Hadoop/src/main/java/de/hsma/bdea/MVM2.java b/HadoopSS23/Hadoop/src/main/java/de/hsma/bdea/MVM2.java new file mode 100644 index 0000000..67d6464 --- /dev/null +++ b/HadoopSS23/Hadoop/src/main/java/de/hsma/bdea/MVM2.java @@ -0,0 +1,158 @@ +package de.hsma.bdea; + +import java.io.IOException; +import java.util.Iterator; +import java.util.StringTokenizer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.log4j.BasicConfigurator; + +//10.0 +//22.0 +//50.0 + +public class MVM2 { + static int length; + + public static void main(String[] args) throws Exception { + BasicConfigurator.configure(); // Log4j Config oder ConfigFile in Resources Folder + System.setProperty("hadoop.home.dir", "/"); // für Hadoop 3.3.0 + + args = new String[] {"resources/mvm.txt", "resources/mvm-output-1" + System.currentTimeMillis(), "resources/mvm-output-2" + System.currentTimeMillis()}; + + Configuration conf = new Configuration(); +// conf.set("length", "3"); + length = 3; // ggf. mit einem vorherigen Job ermitteln + Job job = Job.getInstance(conf, "mvm1"); + + job.setJarByClass(MVM2.class); + job.setMapperClass(TokenizerMapper.class); + job.setReducerClass(MVMMultiplier.class); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + + FileInputFormat.addInputPath(job, new Path(args[0])); + Path outputPath = new Path(args[1]); + FileOutputFormat.setOutputPath(job, outputPath); + outputPath.getFileSystem(conf).delete(outputPath, true); + + job.waitForCompletion(true); + + // Job 2 + conf = new Configuration(); +// conf.set("length", "3"); + job = Job.getInstance(conf, "mvm2"); + + job.setJarByClass(MVM2.class); + job.setMapperClass(Mapper.class); + job.setReducerClass(MVMReducer.class); + job.setNumReduceTasks(2); + job.setPartitionerClass(VectorPartitioner.class); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + FileInputFormat.addInputPath(job, outputPath); + job.setInputFormatClass(SequenceFileInputFormat.class); + + outputPath = new Path(args[2]); + FileOutputFormat.setOutputPath(job, outputPath); + outputPath.getFileSystem(conf).delete(outputPath, true); + + job.waitForCompletion(true); + System.exit(0); + } + + public static class TokenizerMapper extends Mapper { + + public void map(Object key, Text value, Context context) throws IOException, InterruptedException { + String line = value.toString(); + if ("".equals(line)) { + return; + } + + // System.out.println("+++ " + context.getTaskAttemptID().getTaskID().getId() + " - " + value.toString()); +// int length = Integer.parseInt(context.getConfiguration().get("length")); + + String k = null; + String v = null; + + StringTokenizer st = new StringTokenizer(line, ","); + + st.nextToken(); // ignore M or V + + if (line.startsWith("M")) { + k = st.nextToken() + "," + st.nextToken(); + v = st.nextToken(); + context.write(new Text(k), new IntWritable(Integer.parseInt(v))); + // System.out.println(k + " " + v); + } else { + k = st.nextToken(); + v = st.nextToken(); + + for (int i = 0; i < length; i++) { + // System.out.println(i+"," + k + " " + v); + context.write(new Text(i + "," + k), new IntWritable(Integer.parseInt(v))); + } + } + + } + } + + public static class MVMMultiplier extends Reducer { + private IntWritable result = new IntWritable(); + private Text newKey = new Text(); + + public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { +// System.out.println("mmmmkkkkkk " + key.toString()); + + Iterator it = values.iterator(); + result.set(it.next().get() * it.next().get()); + + String s = key.toString(); + newKey.set(s.substring(0, s.indexOf(","))); +// System.out.println("NK: " + newKey.toString() + " -> " + result.get()); + context.write(newKey, result); + } + } + + public static class MVMReducer extends Reducer { + private IntWritable result = new IntWritable(); + + public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { +// System.out.println("kkkkkkk " + key); + int sum = 0; + for (IntWritable val : values) { + sum += val.get(); +// System.out.println("vvvvvv " + val.get()); + } + result.set(sum); + context.write(key, result); + } + } + + public static class VectorPartitioner extends Partitioner { + @Override + public int getPartition(Text key, IntWritable value, int numPartitions) { + double intervalSize = Math.ceil((double)length / numPartitions); + + return Integer.parseInt(key.toString()) / (int)intervalSize; + } + + } + +} + diff --git a/HadoopSS23/Hadoop/src/main/java/de/hsma/bdea/SecondarySorting.java b/HadoopSS23/Hadoop/src/main/java/de/hsma/bdea/SecondarySorting.java new file mode 100644 index 0000000..7248e21 --- /dev/null +++ b/HadoopSS23/Hadoop/src/main/java/de/hsma/bdea/SecondarySorting.java @@ -0,0 +1,100 @@ +package de.hsma.bdea; + +import java.io.IOException; +import java.util.StringTokenizer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.log4j.BasicConfigurator; + +public class SecondarySorting { + + public static void main(String[] args) throws Exception { + BasicConfigurator.configure(); // Log4j Config oder ConfigFile in Resources Folder + System.setProperty("hadoop.home.dir", "/"); // für Hadoop 3.3.0 + + Configuration conf = new Configuration(); + Job job = Job.getInstance(conf, "secosort"); + + job.setJarByClass(SecondarySorting.class); + job.setMapperClass(TokenizerMapper.class); + + job.setNumReduceTasks(2); + job.setPartitionerClass(MyPartitioner.class); + job.setGroupingComparatorClass(MyGroupingComparator.class); + job.setReducerClass(TempReducer.class); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + + FileInputFormat.addInputPath(job, new Path("resources/secosort.txt")); + Path outputPath = new Path("resources/secosort-output-" + System.currentTimeMillis()); + FileOutputFormat.setOutputPath(job, outputPath); + outputPath.getFileSystem(conf).delete(outputPath, true); + + System.exit(job.waitForCompletion(true) ? 0 : 1); + } + + public static class TokenizerMapper extends Mapper { + public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + StringTokenizer st = new StringTokenizer(value.toString(), ", "); + + String yearMonth = st.nextToken() + "-" + st.nextToken(); + st.nextToken(); // ignore day + String temp = st.nextToken(); + + System.out.println(new Text(yearMonth + ":" + "000".substring(0, 3-temp.length()) + temp).toString() + " -> " + value); + context.write(new Text(yearMonth + ":" + "000".substring(0, 3-temp.length()) + temp), value); + } + } + + public static class MyPartitioner extends Partitioner { + @Override + public int getPartition(Text key, Text text, int numberOfPartitions) { + return Math.abs(key.toString().substring(0,7).hashCode() % numberOfPartitions); + } + } + + public static class MyGroupingComparator extends WritableComparator { + public MyGroupingComparator() { + super(Text.class, true); + } + + public int compare(WritableComparable wc1, WritableComparable wc2) { + String s1 = ((Text)wc1).toString(); + String s2 = ((Text)wc2).toString(); + + return s1.substring(0,7).compareTo(s2.substring(0, 7)); + } + } + + public static class TempReducer extends Reducer { + private Text result = new Text(); + + public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { + StringBuilder sb = new StringBuilder(); + + for (Text val : values) { + String s = val.toString(); + System.out.println(s); + sb.append(s.substring(s.lastIndexOf(",") + 1, s.length()).trim() + " "); + } + + result.set(sb.toString()); + + context.write(new Text(key.toString().substring(0, 7)), result); + } + } + +} +