Added serializable and one more publisher

main
Victor Hans-Georg Waitz 2025-12-02 17:58:17 +01:00
parent b917ae613c
commit e71ade0ee7
7 changed files with 121 additions and 25 deletions

4
.gitignore vendored
View File

@ -1,3 +1,7 @@
/.idea/
/.mvn/
/paho*/
target/ target/
!.mvn/wrapper/maven-wrapper.jar !.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/ !**/src/main/**/target/

View File

@ -1,5 +1,7 @@
package vs.messagingmonitor; package vs.messagingmonitor;
import java.io.Serializable;
/** /**
* helper class for RFC 5424 (https://datatracker.ietf.org/doc/html/rfc5424) * helper class for RFC 5424 (https://datatracker.ietf.org/doc/html/rfc5424)
* compliant log messages as immutable Java objects - representation of a subset * compliant log messages as immutable Java objects - representation of a subset
@ -8,7 +10,8 @@ package vs.messagingmonitor;
* @author Sandro Leuchter * @author Sandro Leuchter
* *
*/ */
public abstract class AsciiChars { public abstract class AsciiChars implements Serializable {
private static final long serialVersionUID = 1L;
private final String value; private final String value;
public String value() { public String value() {
@ -41,36 +44,42 @@ public abstract class AsciiChars {
} }
static public final class L004 extends AsciiChars { static public final class L004 extends AsciiChars {
private static final long serialVersionUID = 2L;
public L004(String value) { public L004(String value) {
super(4, value); super(4, value);
} }
} }
static public final class L012 extends AsciiChars { static public final class L012 extends AsciiChars {
private static final long serialVersionUID = 3L;
public L012(String value) { public L012(String value) {
super(12, value); super(12, value);
} }
} }
static public final class L032 extends AsciiChars { static public final class L032 extends AsciiChars {
private static final long serialVersionUID = 4L;
public L032(String value) { public L032(String value) {
super(32, value); super(32, value);
} }
} }
static public final class L048 extends AsciiChars { static public final class L048 extends AsciiChars {
private static final long serialVersionUID = 5L;
public L048(String value) { public L048(String value) {
super(48, value); super(48, value);
} }
} }
static public final class L128 extends AsciiChars { static public final class L128 extends AsciiChars {
private static final long serialVersionUID = 6L;
public L128(String value) { public L128(String value) {
super(128, value); super(128, value);
} }
} }
static public final class L255 extends AsciiChars { static public final class L255 extends AsciiChars {
private static final long serialVersionUID = 7L;
public L255(String value) { public L255(String value) {
super(255, value); super(255, value);
} }

View File

@ -50,17 +50,13 @@ public class MqttReceiver {
@Override @Override
public void messageArrived(String topic, MqttMessage mqttMessage) { public void messageArrived(String topic, MqttMessage mqttMessage) {
System.out.println("Test");
System.out.println("Received: " + mqttMessage.toString()); System.out.println("Received: " + mqttMessage.toString());
byte[] messageData = mqttMessage.getPayload(); byte[] messageData = mqttMessage.getPayload();
SyslogMessage syslogMsg; SyslogMessage syslogMsg;
System.out.println("After getting payload");
try { try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(messageData)); ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(messageData));
System.out.println("After converting to stream");
Object receivedObject = ois.readObject(); Object receivedObject = ois.readObject();
@ -70,8 +66,8 @@ public class MqttReceiver {
} }
syslogMsg = (SyslogMessage) receivedObject; syslogMsg = (SyslogMessage) receivedObject;
if (!(syslogMsg.sev().ordinal() <= SyslogMessage.Severity.WARNING.ordinal())) { if (syslogMsg.sev().ordinal() <= SyslogMessage.Severity.WARNING.ordinal()) {
WebappWebsocket.sendMessage("Channel " + channel + ": " + syslogMsg.toString()); WebappWebsocket.sendMessage("Channel " + channel + ": " + syslogMsg.toString() + "\n");
} }
} }
catch (ClassNotFoundException | IOException e) { catch (ClassNotFoundException | IOException e) {

View File

@ -3,8 +3,7 @@ package vs.messagingmonitor;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectOutputStream; import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Date;
import vs.messagingmonitor.AsciiChars.L032; import vs.messagingmonitor.AsciiChars.L032;
import vs.messagingmonitor.AsciiChars.L048; import vs.messagingmonitor.AsciiChars.L048;
import vs.messagingmonitor.AsciiChars.L128; import vs.messagingmonitor.AsciiChars.L128;
@ -12,12 +11,13 @@ import vs.messagingmonitor.AsciiChars.L255;
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
import vs.messagingmonitor.StructuredData.Element;
import vs.messagingmonitor.StructuredData.Param;
class Publisher { class Publisher1 {
public static void main(String[] args) { public static void main(String[] args) {
String broker = args[0]; String broker = "tcp://localhost:1883";
System.out.println("Broker: " + broker); String topic = "messagingMonitor1337/t";
String topic = args[1];
System.out.println("Topic: " + topic); System.out.println("Topic: " + topic);
@ -27,15 +27,21 @@ class Publisher {
client = new MqttClient(broker, clientId); client = new MqttClient(broker, clientId);
client.connect(); client.connect();
var sd = new StructuredData()//
.add(Element.newTimeQuality(true, true))
.add(new Element("exampleSDID@32473").add(new Param("iut", "3"))
.add(new Param("eventSource", "Application")).add(new Param("eventID", "1011")))
.add(new Element("examplePriority@32473").add(new Param("class", "high")));
SyslogMessage.TextMessage textMessage = new SyslogMessage.TextMessage("message"); SyslogMessage.TextMessage textMessage = new SyslogMessage.TextMessage("message");
SyslogMessage syslogMessage = new SyslogMessage( SyslogMessage syslogMessage = new SyslogMessage(
SyslogMessage.Facility.USER, SyslogMessage.Facility.USER,
SyslogMessage.Severity.INFORMATIONAL, SyslogMessage.Severity.EMERGENCY,
new L255("192.168.2.100"), new L255("192.168.2.100"),
new L048("java"), new L048("java"),
new L128("1"), new L128("1"),
new L032("1"), new L032("1"),
new StructuredData(new ArrayList<>()), sd,
textMessage textMessage
); );
@ -44,16 +50,20 @@ class Publisher {
ObjectOutputStream oos = new ObjectOutputStream(baos); ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(syslogMessage); oos.writeObject(syslogMessage);
oos.flush(); oos.flush();
message.setPayload(baos.toByteArray()); message.setPayload(baos.toByteArray());
//MqttMessage message = new MqttMessage();
//String s = message.toString();
//message.setPayload(s.getBytes());
client.publish(topic, message); client.publish(topic, message);
oos.close(); oos.close();
client.disconnect(); client.disconnect();
System.out.println("DONE!"); System.out.println("DONE!");
} catch (MqttException | IOException e) { } catch (MqttException |IOException e) {
System.err.println(e.getMessage()); e.printStackTrace(System.err);
} }
} }
} }

View File

@ -0,0 +1,69 @@
package vs.messagingmonitor;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import vs.messagingmonitor.AsciiChars.L032;
import vs.messagingmonitor.AsciiChars.L048;
import vs.messagingmonitor.AsciiChars.L128;
import vs.messagingmonitor.AsciiChars.L255;
import vs.messagingmonitor.StructuredData.Element;
import vs.messagingmonitor.StructuredData.Param;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
class Publisher2 {
public static void main(String[] args) {
String broker = "tcp://localhost:1883";
String topic = "messagingMonitor1337/g";
System.out.println("Topic: " + topic);
MqttClient client;
String clientId = MqttClient.generateClientId();
try {
client = new MqttClient(broker, clientId);
client.connect();
var sd = new StructuredData()//
.add(Element.newTimeQuality(true, true))
.add(new Element("exampleSDID@32473").add(new Param("iut", "3"))
.add(new Param("eventSource", "Application")).add(new Param("eventID", "1011")))
.add(new Element("examplePriority@32473").add(new Param("class", "high")));
SyslogMessage.TextMessage textMessage = new SyslogMessage.TextMessage("message");
SyslogMessage syslogMessage = new SyslogMessage(
SyslogMessage.Facility.USER,
SyslogMessage.Severity.EMERGENCY,
new L255("192.168.2.100"),
new L048("java"),
new L128("1"),
new L032("1"),
sd,
textMessage
);
MqttMessage message = new MqttMessage();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(syslogMessage);
oos.flush();
message.setPayload(baos.toByteArray());
//MqttMessage message = new MqttMessage();
//String s = message.toString();
//message.setPayload(s.getBytes());
client.publish(topic, message);
oos.close();
client.disconnect();
System.out.println("DONE!");
} catch (MqttException |IOException e) {
e.printStackTrace(System.err);
}
}
}

View File

@ -1,5 +1,6 @@
package vs.messagingmonitor; package vs.messagingmonitor;
import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -12,8 +13,11 @@ import java.util.List;
* @author Sandro Leuchter * @author Sandro Leuchter
* *
*/ */
public class StructuredData { public class StructuredData implements Serializable {
static public class Element { private static final long serialVersionUID = 8L;
static public class Element implements Serializable{
private static final long serialVersionUID = 9L;
private final String name; private final String name;
private List<Param> parameters; private List<Param> parameters;
@ -101,8 +105,10 @@ public class StructuredData {
} }
} }
static public class Param { static public class Param implements Serializable{
private static final long serialVersionUID = 10L;
private final String name; private final String name;
// name: printable US-ASCII string ^[@=\]\"\s]+ // name: printable US-ASCII string ^[@=\]\"\s]+
// "@" + private enterpise number "@\d+(\.\d+)*" // "@" + private enterpise number "@\d+(\.\d+)*"
private final String value; private final String value;

View File

@ -84,13 +84,14 @@ public class SyslogMessage implements Serializable {
DEBUG; DEBUG;
} }
public static interface Message { public static interface Message extends Serializable {
public Object message(); public Object message();
public int length(); public int length();
} }
public static class BinaryMessage implements Message { public static class BinaryMessage implements Message, Serializable {
private static final long serialVersionUID = 11L;
private Byte[] message; private Byte[] message;
public BinaryMessage(Byte[] message) { public BinaryMessage(Byte[] message) {
@ -113,7 +114,8 @@ public class SyslogMessage implements Serializable {
} }
} }
public static class TextMessage implements Message { public static class TextMessage implements Message, Serializable {
private static final long serialVersionUID = 12L;
private String message; // UTF8 private String message; // UTF8
public TextMessage(String message) { public TextMessage(String message) {