This project has retired. For details please refer to its Attic page.
AMQPNotifier xref
View Javadoc
1   /*
2    * Copyright 2013 The Apache Software Foundation.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.apache.juddi.subscription.notify;
17  
18  import java.net.URISyntaxException;
19  import java.rmi.RemoteException;
20  import java.util.Properties;
21  import java.util.logging.Level;
22  import java.util.logging.Logger;
23  import javax.jms.Connection;
24  import javax.jms.ConnectionFactory;
25  import javax.jms.Destination;
26  import javax.jms.JMSException;
27  import javax.jms.MessageConsumer;
28  import javax.jms.MessageProducer;
29  import javax.jms.Session;
30  import javax.jms.TextMessage;
31  import javax.naming.Context;
32  import javax.naming.InitialContext;
33  import javax.naming.NamingException;
34  
35  import org.apache.commons.configuration.ConfigurationException;
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.juddi.api_v3.AccessPointType;
39  import org.apache.juddi.config.AppConfig;
40  import org.apache.juddi.jaxb.JAXBMarshaller;
41  import org.apache.juddi.model.BindingTemplate;
42  import org.uddi.api_v3.DispositionReport;
43  import org.uddi.api_v3.Result;
44  import org.uddi.subr_v3.NotifySubscriptionListener;
45  import org.uddi.v3_service.DispositionReportFaultMessage;
46  
47  /**
48   * AMQP Notifier
49   *
50   * This is designed to enable users to setup AMQP based alerts for UDDI
51   * subscriptions
52   *
53   * This class is partically complete, but it is largely untested and lacks any
54   * kind of
55   *
56   * the following settings need to be added to the juddiv3.xml file
57   * amqp.java.naming.factory.initial=org.apache.qpid.jndi.PropertiesFileInitialContextFactory
58   * amqp.connectionfactory.qpidConnectionfactory amqp.destination=(some topic or
59   * queue name) amqp.destination.type=topic
60   *
61   * usage create a service/bindingtemplate/accessPoint where the value is
62   * amqp://url_to_qpid/amqp The useType must be "endPoint". create a subscription
63   * where the binding template reference points to this endpoint. trigger the
64   * subscription and wait for delivery.
65   *
66   * @author <a href="mailto:alexoree@apache.org">Alex O'Ree</a>
67   */
68  public class AMQPNotifier implements Notifier {
69  
70          Log log = LogFactory.getLog(this.getClass());
71          String destination = null;
72  
73          String exchangeType = null;
74          String exchangeName = null;
75  
76          public AMQPNotifier(BindingTemplate bindingTemplate) throws URISyntaxException, ConfigurationException {
77                  super();
78                  if (!AccessPointType.END_POINT.toString().equalsIgnoreCase(bindingTemplate.getAccessPointType())) {
79                          log.error("AMQP enpoints only support AccessPointType " + AccessPointType.END_POINT);
80                  }
81                  String accessPointUrl = bindingTemplate.getAccessPointUrl().toLowerCase();
82                  if (!accessPointUrl.startsWith("amqp:")) {
83                          log.warn("AMQP accessPointUrl for bindingTemplate " + bindingTemplate.getEntityKey()
84                                  + " should start with 'amqp:'");
85                  } 
86                  destination = accessPointUrl;
87                  for (int i = 0; i < bindingTemplate.getTmodelInstanceInfos().size(); i++) {
88                          if (bindingTemplate.getTmodelInstanceInfos().get(i).getTmodelKey().equals(Demo.TMODEL_DESTINATION_TYPE)) {
89                                  exchangeType = bindingTemplate.getTmodelInstanceInfos().get(i).getInstanceParms();
90                          }
91                          if (bindingTemplate.getTmodelInstanceInfos().get(i).getTmodelKey().equals(Demo.TMODEL_DESTINATION_NAME)) {
92                                  exchangeName = bindingTemplate.getTmodelInstanceInfos().get(i).getInstanceParms();
93                          }
94                  }
95          }
96  
97          @Override
98          public DispositionReport notifySubscriptionListener(NotifySubscriptionListener body) throws DispositionReportFaultMessage, RemoteException {
99                  Connection connection = null;
100                 Context context = null;
101                 boolean success = false;
102                 String err = null;
103                 try {
104                         if (destination != null && exchangeType != null && exchangeName != null) {
105                                 log.info("Sending notification AMQP to " + destination);
106                                 Properties properties = new Properties();
107 
108                                 properties.put("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
109                                 properties.put("connectionfactory.qpidConnectionfactory", destination);
110                                 properties.put("destination." + exchangeName,exchangeType);
111 
112                                 context = new InitialContext(properties);
113 
114                                 ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("qpidConnectionfactory");
115                                 connection = connectionFactory.createConnection();
116                                 connection.start();
117 
118                                 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
119                                 Destination destinationLocal = (Destination) context.lookup(exchangeName);
120 
121                                 MessageProducer messageProducer = session.createProducer(destinationLocal);
122 
123                                 String subscriptionResultXML = JAXBMarshaller.marshallToString(body, JAXBMarshaller.PACKAGE_SUBSCR_RES);
124                                 TextMessage message = session.createTextMessage(subscriptionResultXML);
125                                 messageProducer.send(message);
126                                 success = true;
127 
128                         }
129                 } catch (Exception e) {
130                         e.printStackTrace();
131                         log.error("Error deliverying AMQP subscription " + e.getMessage());
132                         log.debug("Error deliverying AMQP subscription " + e.getMessage(),e);
133                         err = e.getMessage();
134 
135                 } finally {
136                         try {
137                                 if (connection != null) {
138                                         connection.close();
139                                 }
140                         } catch (JMSException ex) {
141                                 log.error(null, ex);
142                         }
143                         try {
144                                 if (context != null) {
145                                         context.close();
146                                 }
147                         } catch (NamingException ex) {
148                                 log.error(null, ex);
149                         }
150                 }
151                 if (!success) {
152                         throw new DispositionReportFaultMessage(err, null);
153                 }
154                 DispositionReport dr = new DispositionReport();
155                 Result res = new Result();
156                 dr.getResult().add(res);
157 
158                 return dr;
159         }
160 }