This project has retired. For details please refer to its
Attic page.
AMQPNotifier xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.apache.juddi.subscription.notify;
17
18 import java.net.URISyntaxException;
19 import java.rmi.RemoteException;
20 import java.util.Properties;
21 import java.util.logging.Level;
22 import java.util.logging.Logger;
23 import javax.jms.Connection;
24 import javax.jms.ConnectionFactory;
25 import javax.jms.Destination;
26 import javax.jms.JMSException;
27 import javax.jms.MessageConsumer;
28 import javax.jms.MessageProducer;
29 import javax.jms.Session;
30 import javax.jms.TextMessage;
31 import javax.naming.Context;
32 import javax.naming.InitialContext;
33 import javax.naming.NamingException;
34
35 import org.apache.commons.configuration.ConfigurationException;
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.juddi.api_v3.AccessPointType;
39 import org.apache.juddi.config.AppConfig;
40 import org.apache.juddi.jaxb.JAXBMarshaller;
41 import org.apache.juddi.model.BindingTemplate;
42 import org.uddi.api_v3.DispositionReport;
43 import org.uddi.api_v3.Result;
44 import org.uddi.subr_v3.NotifySubscriptionListener;
45 import org.uddi.v3_service.DispositionReportFaultMessage;
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 public class AMQPNotifier implements Notifier {
69
70 Log log = LogFactory.getLog(this.getClass());
71 String destination = null;
72
73 String exchangeType = null;
74 String exchangeName = null;
75
76 public AMQPNotifier(BindingTemplate bindingTemplate) throws URISyntaxException, ConfigurationException {
77 super();
78 if (!AccessPointType.END_POINT.toString().equalsIgnoreCase(bindingTemplate.getAccessPointType())) {
79 log.error("AMQP enpoints only support AccessPointType " + AccessPointType.END_POINT);
80 }
81 String accessPointUrl = bindingTemplate.getAccessPointUrl().toLowerCase();
82 if (!accessPointUrl.startsWith("amqp:")) {
83 log.warn("AMQP accessPointUrl for bindingTemplate " + bindingTemplate.getEntityKey()
84 + " should start with 'amqp:'");
85 }
86 destination = accessPointUrl;
87 for (int i = 0; i < bindingTemplate.getTmodelInstanceInfos().size(); i++) {
88 if (bindingTemplate.getTmodelInstanceInfos().get(i).getTmodelKey().equals(Demo.TMODEL_DESTINATION_TYPE)) {
89 exchangeType = bindingTemplate.getTmodelInstanceInfos().get(i).getInstanceParms();
90 }
91 if (bindingTemplate.getTmodelInstanceInfos().get(i).getTmodelKey().equals(Demo.TMODEL_DESTINATION_NAME)) {
92 exchangeName = bindingTemplate.getTmodelInstanceInfos().get(i).getInstanceParms();
93 }
94 }
95 }
96
97 @Override
98 public DispositionReport notifySubscriptionListener(NotifySubscriptionListener body) throws DispositionReportFaultMessage, RemoteException {
99 Connection connection = null;
100 Context context = null;
101 boolean success = false;
102 String err = null;
103 try {
104 if (destination != null && exchangeType != null && exchangeName != null) {
105 log.info("Sending notification AMQP to " + destination);
106 Properties properties = new Properties();
107
108 properties.put("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
109 properties.put("connectionfactory.qpidConnectionfactory", destination);
110 properties.put("destination." + exchangeName,exchangeType);
111
112 context = new InitialContext(properties);
113
114 ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("qpidConnectionfactory");
115 connection = connectionFactory.createConnection();
116 connection.start();
117
118 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
119 Destination destinationLocal = (Destination) context.lookup(exchangeName);
120
121 MessageProducer messageProducer = session.createProducer(destinationLocal);
122
123 String subscriptionResultXML = JAXBMarshaller.marshallToString(body, JAXBMarshaller.PACKAGE_SUBSCR_RES);
124 TextMessage message = session.createTextMessage(subscriptionResultXML);
125 messageProducer.send(message);
126 success = true;
127
128 }
129 } catch (Exception e) {
130 e.printStackTrace();
131 log.error("Error deliverying AMQP subscription " + e.getMessage());
132 log.debug("Error deliverying AMQP subscription " + e.getMessage(),e);
133 err = e.getMessage();
134
135 } finally {
136 try {
137 if (connection != null) {
138 connection.close();
139 }
140 } catch (JMSException ex) {
141 log.error(null, ex);
142 }
143 try {
144 if (context != null) {
145 context.close();
146 }
147 } catch (NamingException ex) {
148 log.error(null, ex);
149 }
150 }
151 if (!success) {
152 throw new DispositionReportFaultMessage(err, null);
153 }
154 DispositionReport dr = new DispositionReport();
155 Result res = new Result();
156 dr.getResult().add(res);
157
158 return dr;
159 }
160 }