Apache ActiveMQ is the most popular and powerful open source messaging, message oriented broker. It supports many cross language clients. You can use ActiveMQ with any language(C, C++, Java..), any protocols (TCP, UDP, Openwire, Rest…). Also you can manage queue and asynchronous messaging.
ActiveMQ has 3 main mechanism:
Broker: It gets message from producers and send it to the consumers. Broker can be used as embedded or ActiveMQ server can be used as broker.
Producer:It creates message and send it to the broker.
Consumer: It gets message from broker and process it.

You can also use topic and queue to send message to receiver from sender.


In this post, we will learn how to use ActiveMQ with Java SWT Applications. I downloaded ActiveMQ and installed it on Linux and Windows OS. You can reach ActiveMQ Installion from this link. You can run ActiveMQ in linux using this commands.
cd {ActiveMq Directory}/bin
./activemq start # to start activemq
netstat -an|grep 61616 # to show the 61616 listening port
./activemq stop # to stop activemq
Also, you can see configuration and details in browser using http://localhost:8161/admin/. Username: admin; password: admin.
I have two Java SWT application: Module 1 and Module 2. In scenario, Module 2 sents number to Module 1 using ActiveMq.

To use code, you should add external jars: javax.jms-3.1.1.jar, openwire-example-0.1-SNAPSHOT.jar.
GitHUB Links:
https://github.com/omerbsezer/Module1
https://github.com/omerbsezer/Module2
Module 1 Receiving Side has 2 part: MainScreen (for gui) and Thread1 (for connecting activemq and receiving data ) .
MainScreen.java
package MainPackage;
import org.eclipse.swt.widgets.Display;
import org.eclipse.swt.widgets.Shell;
import org.eclipse.swt.widgets.Label;
import org.eclipse.swt.SWT;
import org.eclipse.swt.widgets.Text;
import org.eclipse.swt.widgets.Group;
public class MainScreen {
protected Shell shlModule;
public static Text ReceivingText;
public static String string_ref_temp=null;
private Group grpModuleReceiving;
/**
* Launch the application.
* @param args
*/
public static void main(String[] args) {
try {
MainScreen window = new MainScreen();
window.open();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void doUpdate(final Display display, final Text target,
final String value) {
Display.getDefault().asyncExec(new Runnable() {
@Override
public void run() {
if (!target.isDisposed()) {
target.setText(value);
target.getParent().layout();
}
}
});
}
public static void UpdateInfo(final Display display, final Label target,
final String value) {
Display.getDefault().asyncExec(new Runnable() {
@Override
public void run() {
if (!target.isDisposed()) {
target.setText(value);
target.getParent().layout();
}
}
});
}
/**
* Open the window.
*/
public void open() {
Display display = Display.getDefault();
createContents();
Thread1 MThread = new Thread1();
Thread t = new Thread(MThread);
t.start();
shlModule.open();
shlModule.layout();
while (!shlModule.isDisposed()) {
if (!display.readAndDispatch()) {
display.sleep();
}
}
}
/**
* Create contents of the window.
*/
protected void createContents() {
shlModule = new Shell();
shlModule.setSize(450, 198);
shlModule.setText("Module 1");
grpModuleReceiving = new Group(shlModule, SWT.NONE);
grpModuleReceiving.setText("Module 1 Receiving Part");
grpModuleReceiving.setBounds(29, 10, 389, 117);
ReceivingText = new Text(grpModuleReceiving, SWT.BORDER);
ReceivingText.setBounds(168, 49, 104, 27);
Label lblReceiving = new Label(grpModuleReceiving, SWT.NONE);
lblReceiving.setBounds(81, 59, 68, 17);
lblReceiving.setText("Receiving:");
}
}
Thread1.java
package MainPackage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.eclipse.swt.widgets.Display;
public class Thread1 extends MainScreen implements Runnable {
public Destination dest_ref_temp;
public MessageConsumer consumer_ref_temp;
public String user=null,password=null,host=null;
public String text_ref_temp = null;
public int port;
public Session session_ref_temp;
private Display display;
public Display getDisplay(){
return display;
}
@Override
public void run() {
// TODO Auto-generated method stub
user = env("ACTIVEMQ_USER", "admin");
password = env("ACTIVEMQ_PASSWORD", "password");
host = env("ACTIVEMQ_HOST", "localhost");
int port = Integer.parseInt(env("ACTIVEMQ_PORT", "61616"));
//String destination = arg(args, 0, "event");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://" + host + ":" + port);
Connection connection = null;
try {
connection = factory.createConnection(user, password);
//connection = factory.createConnection();
connection.start();
session_ref_temp = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
dest_ref_temp = session_ref_temp.createQueue("REF_TEMP.FOO");
consumer_ref_temp = session_ref_temp.createConsumer(dest_ref_temp);
} catch (JMSException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
System.out.println("Waiting for messages...");
while (true) {
// Wait for a message
Message message_ref_temp = null;
try {
message_ref_temp = consumer_ref_temp.receive(1000);
} catch (JMSException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
if (message_ref_temp == null){
continue;
}
if (message_ref_temp instanceof TextMessage ) {
TextMessage textMessage_ref_temp = (TextMessage) message_ref_temp;
try {
text_ref_temp = textMessage_ref_temp.getText();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
string_ref_temp=String.valueOf(text_ref_temp);
doUpdate(display,ReceivingText,text_ref_temp);
} else {
}
}
/*try {
consumer.close();
session.close();
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}*/
}
private static String env(String key, String defaultValue) {
String rc = System.getenv(key);
if( rc== null )
return defaultValue;
return rc;
}
}
Module 2 Sending Side has 2 part: M2MainScreen (for gui) and SendingThread (for connecting activemq and sending data ) .
M2MainScreen.java
package MainPackage;
import org.eclipse.swt.widgets.Display;
import org.eclipse.swt.widgets.Event;
import org.eclipse.swt.widgets.Listener;
import org.eclipse.swt.widgets.Shell;
import org.eclipse.swt.widgets.Text;
import org.eclipse.swt.SWT;
import org.eclipse.swt.widgets.Label;
import org.eclipse.swt.widgets.Button;
import org.eclipse.swt.widgets.Group;
import org.eclipse.swt.widgets.ProgressBar;
public class M2MainScreen {
protected Shell shlModule;
public static Display display;
public static Text ref_temp;
public static Button btnStart;
public static Button btnStop;
public static Label lblInfo;
public static String string_ref_temp;
public static String isRunning;
public static int i;
public static int stat=1;
public static int ref_temp_value=0;
/**
* Launch the application.
* @param args
*/
public static void main(String[] args) {
try {
M2MainScreen window = new M2MainScreen();
window.open();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void doUpdate(final Display display, final Text target,
final String value) {
Display.getDefault().asyncExec(new Runnable() {
@Override
public void run() {
if (!target.isDisposed()) {
target.setText(value);
target.getParent().layout();
}
}
});
}
/**
* Open the window.
*/
public void open() {
Display display = Display.getDefault();
createContents();
Listener listener = new Listener() {
public void handleEvent(Event event) {
if (event.widget == btnStart) {
//response[0] = true;
lblInfo.setText("Module2 Sending Action Started!");
isRunning="true";
SendingThread sendingThread = new SendingThread();
Thread t = new Thread(sendingThread);
t.start();
} else if (event.widget == btnStop) {
lblInfo.setText("Module2 Sending Action Stopped!");
isRunning="false";
stat=1;
}
else{
}
//dialog.close();
}
};
btnStart.addListener(SWT.Selection, listener);
btnStop.addListener(SWT.Selection, listener);
shlModule.open();
shlModule.layout();
while (!shlModule.isDisposed()) {
if (!display.readAndDispatch()) {
display.sleep();
}
}
}
/**
* Create contents of the window.
*/
protected void createContents() {
shlModule = new Shell();
shlModule.setSize(450, 268);
shlModule.setText("Module 2");
lblInfo = new Label(shlModule, SWT.NONE);
lblInfo.setBounds(79, 216, 290, 17);
Group grpModuleSending = new Group(shlModule, SWT.NONE);
grpModuleSending.setText("Module 2 Sending Part");
grpModuleSending.setBounds(33, 10, 375, 200);
Label lblNewLabel = new Label(grpModuleSending, SWT.NONE);
lblNewLabel.setBounds(59, 60, 68, 17);
lblNewLabel.setText("Sending: ");
ref_temp = new Text(grpModuleSending, SWT.BORDER);
ref_temp.setBounds(133, 50, 75, 27);
btnStart = new Button(grpModuleSending, SWT.NONE);
btnStart.setBounds(134, 99, 88, 29);
btnStart.setText("Start");
btnStop = new Button(grpModuleSending, SWT.NONE);
btnStop.setBounds(228, 99, 88, 29);
btnStop.setText("Stop");
Label lblStatus = new Label(shlModule, SWT.NONE);
lblStatus.setBounds(10, 216, 50, 17);
lblStatus.setText("Status: "); }
}
SendingThread.java
package MainPackage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class SendingThread extends M2MainScreen implements Runnable {
public Destination dest_ref_temp;
public MessageProducer producer_ref_temp;
public String user=null,password=null,host=null,text = null;
public int port;
public Session session_ref_temp;
@Override
public void run() {
//while (isRunning) {
// TODO Auto-generated method stub
String user = env("ACTIVEMQ_USER", "admin");
String password = env("ACTIVEMQ_PASSWORD", "password");
String host = env("ACTIVEMQ_HOST", "localhost");
int port = Integer.parseInt(env("ACTIVEMQ_PORT", "61616"));
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://" + host + ":" + port);
Connection connection = null;
try {
connection = factory.createConnection(user, password);
connection.start();
session_ref_temp = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
dest_ref_temp = session_ref_temp.createQueue("REF_TEMP.FOO");
producer_ref_temp = session_ref_temp.createProducer(dest_ref_temp);
producer_ref_temp.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
} catch (JMSException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
//Destination dest = new ActiveMQTopic(destination);
// Create a messages
while(true){
String text_ref_temp = string_ref_temp;
if(isRunning=="false"){
i=0;
break;
}
text_ref_temp=Integer.toString(i);
i++;
TextMessage message_ref_temp;
try {
message_ref_temp = session_ref_temp.createTextMessage(text_ref_temp);
producer_ref_temp.send(message_ref_temp);
//System.out.println("Send1: " + text);
doUpdate(display,ref_temp,text_ref_temp);
try {
Thread.sleep(500);
} catch(InterruptedException e) {
}
} catch (JMSException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
//producer.send(session.createTextMessage("SHUTDOWN"));
/*try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}*/
}
}
private static String env(String key, String defaultValue) {
String rc = System.getenv(key);
if( rc== null )
return defaultValue;
return rc;
}
}