Pages

Sunday, 1 May 2011

A little leap: invoking a HTML5 WebSocket from PL/SQL with Oracle Advanced Queuing (Oracle JMS).

How it started

Recently I had to organize a HTML5 coding Dojo within the company I work, the difficult part concerning this dojo was not HTML5 but finding an interesting use-case to code around. A major aspect to consider, concerning finding a feasible use-case, was the fact that the dojo audience was not a homogeneous group; on the contrary it was primarily formed by a mix of Java and PL/SQL programmers.

The use-case

To find a suitable use-case, I first launched a little intern poll to collect all ideas, the response was quite poor and none of the proposals met my needs, but just as I was considering to give up, l had a little epiphany :-)… What if I could invoke a WebSocket from PL/SQL code? Wouldn’t that be a nice use-case for the entire audience? This is what I came up with…

  • WebSocket gateway: Kaazing Gateway.
  • Oracle Advanced Queuing (Oracle JMS).
  • A self-written proxy between the WebSocket gateway and Oracle AQ.

Kaazing Gateway

First the easy part: install and configure the Kaazing Gateway.
I just downloaded Kaazing WebSocket Gateway - HTML5 Edition 3.1.6 here, and followed the getting started documentation.
For my use-case I added a new Service in the gateway-config.xml (for more details see Administrator’s Guide).

  1: <service>
  2:   	<accept>ws://hostname:8002/oaq</accept>
  3:   	<type>broadcast</type>
  4: 	<properties>
  5:      	 	<accept>udp://hostname:9876</accept>
  6:     	</properties>
  7: 
  8: 	<cross-site-constraint>
  9:     		<allow-origin>*</allow-origin>
 10:   	</cross-site-constraint>
 11:   </service>  
 12: 

Note
You have to adjust the hostname to your servers hostname.


Oracle Advanced Queuing


This section describes how to set up the Oracle Advanced Queuing.

  1: --Oracle schema grants needed for AQ
  2: 
  3: GRANT EXECUTE ON dbms_aq TO [user];
  4: GRANT EXECUTE ON dbms_aqadm TO [user];
  5: GRANT EXECUTE ON dbms_aqin to [user];
  6: 
  7: --Prepare schema for Oracle AQ
  8: 
  9: --Create object type
 10: 
 11: CREATE OR REPLACE TYPE notification_type AS OBJECT
 12: ( table_name        varchar2(100)
 13: , key_name          varchar2(100)
 14: , key_              number(10)
 15: , message           varchar2(1000)
 16: , user_             varchar2(20)
 17: , action	    varchar2(100)
 18: );
 19: 
 20: --Create queue table
 21: 
 22: BEGIN
 23:   dbms_aqadm.create_queue_table
 24:   ( queue_table         => 'aq_notifications'
 25:   , queue_payload_type  => 'notification_type'
 26:   , multiple_consumers  => false
 27:   , comment             => 'AQ Notification Queue'
 28:   );
 29: END;
 30: 
 31: --Create queue
 32: 
 33: BEGIN
 34:   dbms_aqadm.create_queue
 35:   ( queue_name  => 'notification_queue'
 36:   , queue_table => 'aq_notifications'
 37:   );
 38: END;
 39: 
 40: --Start queuing
 41: 
 42: BEGIN
 43:   dbms_aqadm.start_queue( queue_name => 'notification_queue');
 44: END;
 45: 
 46: --Procedure to enqueue new notifcations
 47: 
 48: CREATE OR REPLACE
 49: PROCEDURE enqueue_notification_type
 50: ( p_notification in notification_type)
 51: IS
 52:   queue_options       dbms_aq.enqueue_options_t;
 53:   message_properties  dbms_aq.message_properties_t;
 54:   message_id          raw(16);
 55: BEGIN
 56:   dbms_aq.enqueue
 57:   ( queue_name         => 'notification_queue'
 58:   , enqueue_options    => queue_options
 59:   , message_properties => message_properties
 60:   , payload            => p_notification
 61:   , msgid              => message_id
 62:   );
 63: END;
 64: 
 65: --Enqueue a Message
 66: 
 67: EXEC enqueue_notification_type( p_notification  => notification_type('DummyAQ', 'ID', 1, 'Test 1', 'SYS', 'EXEC'));
 68: COMMIT;

Important note
The Java proxy described in the next section needs a Java Wrapper Type for the Oracle notification object type.
If you change the notification Oracle object type, the proxy has to be rebuilt with a new Java notification type wrapper object.
The java notification type wrapper object is generated via JPublisher (which comes with SQLJ). To generate the wrapper you have to execute the following command:
jpub -user=[user]/[password] -sql=Notification_Type -usertypes=oracle -methods=false


Oracle AQ WebSocket Proxy



The proxy consists of three classes:


  • AQWebSocketProxy: the Proxy.
  • Notification_Type: Java Wrapper for the Oracle Notification object type.
  • Notification_TypeRef: Java Wrapper for the Oracle Notification object type.

The next snippets are code snippets from the proxy class.
The most import things the class does are:
Listening to the Oracle AQ and dequeue a message as soon as it comes available on the queue.

  1:     private void dequeue() {
  2:         try {
  3:             LOGGER.info("Start listening single consumer queue...");
  4:             final AQOracleDriver driver = new AQOracleDriver();
  5:             while (true) {
  6:                 final AQSession session = driver.createAQSession(connection);
  7:                 //Create reference to qeue.
  8:                 final AQQueue queue = session.getQueue(username, queueName);
  9:                 final AQDequeueOption dequeueOption = new AQDequeueOption();
 10:                 //Set dequeue mode to BROWSE (messages won't get deleted after dequeue).
 11:                 //dequeueOption.setDequeueMode(AQDequeueOption.DEQUEUE_BROWSE);
 12:                 dequeueOption.setWaitTime(AQDequeueOption.WAIT_FOREVER);
 13:                 LOGGER.info("Waiting for message to dequeue...");
 14:                 final AQMessage message = queue.dequeue(dequeueOption, Notification_Type.getORADataFactory());
 15:                 final byte[] msgId = message.getMessageId();
 16:                 LOGGER.info("Dequeue starts for message: " + msgId + "  ...");
 17:                 final AQObjectPayload payload = message.getObjectPayload();
 18:                 //Retrieve data.
 19:                 final Notification_Type messageData = (Notification_Type) payload.getPayloadData();
 20:                 LOGGER.debug("[BEGIN Message Data]");
 21:                 LOGGER.debug(messageData.getTableName());
 22:                 LOGGER.debug(messageData.getKeyName());
 23:                 LOGGER.debug(messageData.getKey());
 24:                 LOGGER.debug(messageData.getMessage());
 25:                 LOGGER.debug(messageData.getUser());
 26:                 LOGGER.debug(messageData.getAction());
 27:                 LOGGER.debug("[END Message Data]");
 28:                 connection.commit();
 29:                 sendUDPPacket(messageData);
 30:                 LOGGER.info("Dequeue for message: " + msgId + " done.");
 31:             }
 32:         } catch (SQLException sqlex) {
 33:             LOGGER.error(sqlex.getMessage(), sqlex);
 34:         } catch (AQException aqe) {
 35:             LOGGER.error(aqe.getMessage(), aqe);
 36:         } catch (SocketException se) {
 37:             LOGGER.error(se.getMessage(), se);
 38:         } catch (UnknownHostException uhe) {
 39:             LOGGER.error(uhe.getMessage(), uhe);
 40:         } catch (IOException ioe) {
 41:             LOGGER.error(ioe.getMessage(), ioe);
 42:         }
 43:     }

The dequeued message will be translated to JSON first and then it will be send to the WebSocket gateway via the UDP protocol.

  1:     private String convertToJSON(final Notification_Type message) {
  2:         final JsonFactory factory = new JsonFactory();
  3:         final StringWriter writer = new StringWriter();
  4:         JsonGenerator gen;
  5:         try {
  6:             gen = factory.createJsonGenerator(writer);
  7:             gen.writeStartObject();
  8:             gen.writeStringField("TableName", message.getTableName());
  9:             gen.writeStringField("KeyName", message.getKeyName());
 10:             gen.writeNumberField("Key", message.getKey());
 11:             gen.writeStringField("Message", message.getMessage());
 12:             gen.writeStringField("User", message.getUser());
 13:             gen.writeStringField("Action", message.getAction());
 14:             gen.writeEndObject();
 15:             gen.close();
 16:         } catch (IOException ioe) {
 17:             LOGGER.error(ioe.getMessage(), ioe);
 18:         } catch (SQLException se) {
 19:             LOGGER.error(se.getMessage(), se);
 20:         }
 21:         final String json = writer.getBuffer().toString();
 22:         LOGGER.debug("[BEGING JSON Format]");
 23:         LOGGER.debug(json);
 24:         LOGGER.debug("[END JSON Format]");
 25:         return json;
 26:     }
  1:     protected void sendUDPPacket(final Notification_Type message) throws SocketException, UnknownHostException,
  2:                                                                          IOException {
  3:         final DatagramSocket clientSocket = new DatagramSocket();
  4:         final InetAddress IPAddress = InetAddress.getByName(gatewayHost);
  5:         byte[] sendData = new byte[1024];
  6:         final String jsonMessage = convertToJSON(message);
  7:         sendData = jsonMessage.getBytes();
  8:         final DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, IPAddress, 9876);
  9:         clientSocket.send(sendPacket);
 10:     }

I made one little adjustment to the Wrapper files to make them ‘schema’ independent (I pass the schema name from the Proxy class):

  1: public class Notification_TypeRef implements ORAData, ORADataFactory {
  2:     public static final String _SQL_BASETYPE = AQWebSocketProxy.username + ".NOTIFICATION_TYPE";
  1: public class Notification_Type implements ORAData, ORADataFactory {
  2:     public static final String _SQL_NAME = AQWebSocketProxy.username + ".NOTIFICATION_TYPE";

The complete source code of the proxy is available in the download section below. It includes a runnable jar, all the source code, a HTML test page and a complete README file to get you started right away.


Run the Proxy and test it


First run the Kaazing Gateway.
Put the HTML5 WebSocket test page (which is included in the zip file –don’t forget to adjust the hostname to point to the Kaazing gateway host) on a webserver and open it in Google Chrome.
Then Run the AQWebSocketProxy (I ran it on the same server as the Kaazing gateway):
java -jar AQWebSocketProxy.jar -user scott -pwd tiger -dbhost localhost -dbsid ORCL -queue notification_queue -wsgateway localhost –debug

If everything works fine you can now execute PL/SQL code and see the result in your browser.

  1: EXEC enqueue_notification_type( p_notification  => notification_type('DummyAQ', 'ID', 1, 'Test 1', 'SYS', 'EXEC'));
  2: COMMIT;

Download & Documentation


The complete source code of the proxy is available here, it includes a runnable jar, all the source code, a HTML test page and a complete README file to get you started in seconds.


References


Oracle Advanced Queuing by Example
Advanced Queuing -- Java API
Oracle Streams AQ JMS Interface: Basic Operations
Oracle Advanced Queuing (1)
Oracle Advanced Queuing (2)
JPublisher with the 11g database
DBMS_AQADM
Oracle Streams AQ Operational Interface: Basic Operations
oracle aq and java jms
Oracle Advanced Queuing and JMS