This project has retired. For details please refer to its
Attic page.
ReplicationNotifier xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.juddi.replication;
18
19 import java.io.UnsupportedEncodingException;
20 import java.util.ArrayList;
21 import java.util.HashSet;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Queue;
25 import java.util.Set;
26 import java.util.Timer;
27 import java.util.TimerTask;
28 import java.util.concurrent.ConcurrentLinkedQueue;
29 import java.util.logging.Level;
30 import java.util.logging.Logger;
31 import javax.persistence.EntityManager;
32 import javax.persistence.EntityTransaction;
33 import javax.persistence.Query;
34 import javax.xml.bind.JAXB;
35 import javax.xml.ws.BindingProvider;
36 import org.apache.commons.configuration.ConfigurationException;
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.juddi.api_v3.Node;
40 import org.apache.juddi.config.AppConfig;
41 import org.apache.juddi.config.PersistenceManager;
42 import org.apache.juddi.config.Property;
43 import org.apache.juddi.mapping.MappingApiToModel;
44 import org.apache.juddi.mapping.MappingModelToApi;
45 import org.apache.juddi.model.ChangeRecord;
46 import org.apache.juddi.model.ReplicationConfiguration;
47 import org.apache.juddi.v3.client.UDDIService;
48 import org.apache.juddi.v3.client.cryptor.TransportSecurityHelper;
49 import org.uddi.repl_v3.ChangeRecordIDType;
50 import org.uddi.repl_v3.CommunicationGraph.Edge;
51 import org.uddi.repl_v3.HighWaterMarkVectorType;
52 import org.uddi.repl_v3.NotifyChangeRecordsAvailable;
53 import org.uddi.repl_v3.Operator;
54 import org.uddi.v3_service.UDDIReplicationPortType;
55
56
57
58
59
60
61
62
63
64 public class ReplicationNotifier extends TimerTask {
65
66 private static Log log = LogFactory.getLog(ReplicationNotifier.class);
67 private Timer timer = null;
68 private long startBuffer = 5000;
69 private long interval = 5000;
70 private static String node = null;
71 private static UDDIService uddiService = new UDDIService();
72
73
74
75
76
77
78 public ReplicationNotifier() throws ConfigurationException {
79 super();
80 init();
81 }
82 private synchronized void init() throws ConfigurationException {
83 timer = new Timer(true);
84 startBuffer=AppConfig.getConfiguration().getLong(Property.JUDDI_REPLICATION_START_BUFFER, 5000l);
85 interval = AppConfig.getConfiguration().getLong(Property.JUDDI_REPLICATION_INTERVAL, 5000l);
86 timer.scheduleAtFixedRate(this, startBuffer, interval);
87 if (queue == null) {
88 queue = new ConcurrentLinkedQueue();
89 }
90 node = AppConfig.getConfiguration().getString(Property.JUDDI_NODE_ID, "UNDEFINED_NODE_NAME");
91 }
92
93 @Override
94 public boolean cancel() {
95 timer.cancel();
96
97 return super.cancel();
98 }
99
100
101 public synchronized static void enqueue(org.apache.juddi.model.ChangeRecord change) {
102 if (queue == null) {
103 queue = new ConcurrentLinkedQueue<org.apache.juddi.model.ChangeRecord>();
104 }
105 queue.add(change);
106 }
107
108 public synchronized static void EnqueueRetransmit(org.uddi.repl_v3.ChangeRecord change) {
109 if (queue2 == null) {
110 queue2 = new ConcurrentLinkedQueue<org.uddi.repl_v3.ChangeRecord>();
111 }
112 queue2.add(change);
113 }
114 static Queue<org.apache.juddi.model.ChangeRecord> queue;
115 static Queue<org.uddi.repl_v3.ChangeRecord> queue2;
116
117
118
119
120
121
122
123
124 protected void ProcessChangeRecord(org.apache.juddi.model.ChangeRecord j) {
125
126
127
128
129 EntityManager em = PersistenceManager.getEntityManager();
130 EntityTransaction tx = null;
131 try {
132 tx = em.getTransaction();
133 tx.begin();
134 j.setIsAppliedLocally(true);
135 em.persist(j);
136 j.setOriginatingUSN(j.getId());
137 em.merge(j);
138 log.info("CR saved locally, it was from " + j.getNodeID()
139 + " USN:" + j.getOriginatingUSN()
140 + " Type:" + j.getRecordType().name()
141 + " Key:" + j.getEntityKey()
142 + " Local id:" + j.getId());
143 tx.commit();
144 } catch (Exception ex) {
145 log.fatal("unable to store local change record locally!!", ex);
146 if (tx != null && tx.isActive()) {
147 tx.rollback();
148 }
149 JAXB.marshal(MappingModelToApi.mapChangeRecord(j), System.out);
150 } finally {
151 em.close();
152 }
153
154 log.debug("ChangeRecord: " + j.getId() + "," + j.getEntityKey() + "," + j.getNodeID() + "," + j.getOriginatingUSN() + "," + j.getRecordType().toString());
155 SendNotifications(j.getId(), j.getNodeID(), false);
156
157 }
158
159 private void SendNotifications(Long id, String origin_node, boolean isRetrans) {
160
161 org.uddi.repl_v3.ReplicationConfiguration repcfg = FetchEdges();
162
163 if (repcfg == null) {
164 log.debug("No replication configuration is defined!");
165 return;
166
167 }
168 if (id == null || origin_node == null) {
169 log.fatal("Either the id is null or the origin_node is null. I can't send out this alert!!");
170
171 return;
172 }
173
174 Set<Object> destinationUrls = new HashSet<Object>();
175
176
177
178
179
180
181
182 if (repcfg.getCommunicationGraph() == null
183 || repcfg.getCommunicationGraph().getEdge().isEmpty() && !isRetrans) {
184
185
186 for (Operator o : repcfg.getOperator()) {
187
188 if (!o.getOperatorNodeID().equalsIgnoreCase(node) && !o.getOperatorNodeID().equalsIgnoreCase(origin_node)) {
189 destinationUrls.add(o.getSoapReplicationURL());
190 }
191 }
192 } else {
193
194
195 Iterator<Edge> iterator = repcfg.getCommunicationGraph().getEdge().iterator();
196 while (iterator.hasNext()) {
197 Edge next = iterator.next();
198
199 if (next.getMessageSender().equalsIgnoreCase(node)) {
200
201
202 String messageReceiver = next.getMessageReceiver();
203 PrimaryAlternate container = new PrimaryAlternate();
204
205 if (!messageReceiver.equalsIgnoreCase(node) && !messageReceiver.equalsIgnoreCase(origin_node)) {
206
207 for (int x = 0; x < repcfg.getOperator().size(); x++) {
208 if (repcfg.getOperator().get(x).getOperatorNodeID().equalsIgnoreCase(messageReceiver)) {
209 container.primaryUrl = repcfg.getOperator().get(x).getSoapReplicationURL();
210 }
211 }
212 for (int y = 0; y < next.getMessageReceiverAlternate().size(); y++) {
213 for (int x = 0; x < repcfg.getOperator().size(); x++) {
214 if (repcfg.getOperator().get(x).getOperatorNodeID().equalsIgnoreCase(next.getMessageReceiverAlternate().get(y))) {
215 container.alternateUrls.add(repcfg.getOperator().get(x).getSoapReplicationURL());
216 }
217 }
218 }
219 }
220 if (container.primaryUrl != null) {
221 destinationUrls.add(container);
222 } else {
223 log.warn("Unable to find primary url for directed edge graph replication from this node " + node + " to "
224 + "destination node " + next.getMessageReceiver() + " it will be ignored!");
225 }
226
227 }
228
229 }
230
231 }
232
233 if (destinationUrls.isEmpty()) {
234 log.debug("Something is bizarre with the replication config. I should have had at least one node to notify, but I have none!");
235 return;
236 }
237 UDDIReplicationPortType x = uddiService.getUDDIReplicationPort();
238 TransportSecurityHelper.applyTransportSecurity((BindingProvider) x);
239
240 for (Object s : destinationUrls) {
241
242 NotifyChangeRecordsAvailable req = new NotifyChangeRecordsAvailable();
243
244 req.setNotifyingNode(node);
245 HighWaterMarkVectorType highWaterMarkVectorType = new HighWaterMarkVectorType();
246
247 highWaterMarkVectorType.getHighWaterMark().add(new ChangeRecordIDType(origin_node, id));
248 req.setChangesAvailable(highWaterMarkVectorType);
249
250 if (s instanceof String) {
251 SendNotification(x, (String) s, req);
252 } else if (s instanceof PrimaryAlternate) {
253
254 PrimaryAlternate pa = (PrimaryAlternate) s;
255 if (!SendNotification(x, pa.primaryUrl, req)) {
256 for (String url : pa.alternateUrls) {
257 if (SendNotification(x, url, req)) {
258 break;
259 }
260
261 }
262 } else {
263
264 }
265
266 }
267
268
269 }
270
271 }
272
273
274
275
276
277
278
279
280
281 private boolean SendNotification(UDDIReplicationPortType x, String s, NotifyChangeRecordsAvailable req) {
282 ((BindingProvider) x).getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, s);
283 try {
284 x.notifyChangeRecordsAvailable(req);
285 log.info("Successfully sent change record available message to " + s + " this node: " + node);
286 return true;
287 } catch (Exception ex) {
288 log.warn("Unable to send change notification to " + s + " this node: " + node + " reason: " + ex.getMessage());
289 log.debug("Unable to send change notification to " + s, ex);
290 }
291 return false;
292 }
293
294 private static class PrimaryAlternate {
295
296 String primaryUrl = null;
297 List<String> alternateUrls = new ArrayList<String>();
298 }
299
300 public synchronized void run() {
301 log.debug("Replication thread triggered");
302 if (queue == null) {
303 queue = new ConcurrentLinkedQueue();
304 }
305 if (queue2 == null) {
306 queue2 = new ConcurrentLinkedQueue();
307 }
308
309 if (!queue.isEmpty()) {
310 log.info("Replication, Notifying nodes of new change records. " + queue.size() + " queued");
311 }
312
313
314 while (!queue.isEmpty()) {
315
316
317 ChangeRecord j = queue.poll();
318 ProcessChangeRecord(j);
319
320 }
321
322 while (!queue2.isEmpty()) {
323
324
325 org.uddi.repl_v3.ChangeRecord j = queue2.poll();
326
327 ChangeRecord model = new ChangeRecord();
328 try {
329 model = MappingApiToModel.mapChangeRecord(j);
330 } catch (UnsupportedEncodingException ex) {
331 Logger.getLogger(ReplicationNotifier.class.getName()).log(Level.SEVERE, null, ex);
332 }
333 log.info("retransmitting CR notificationm entity owner: " + j.getChangeID().getNodeID() + " CR: " + j.getChangeID().getOriginatingUSN() + " key:" + model.getEntityKey() + " " + model.getRecordType().name() + " accepted locally:" + model.getIsAppliedLocally());
334 SendNotifications(j.getChangeID().getOriginatingUSN(), j.getChangeID().getNodeID(), true);
335
336 }
337 }
338
339
340
341
342
343
344
345 public static org.uddi.repl_v3.ReplicationConfiguration FetchEdges() {
346
347 EntityManager em = PersistenceManager.getEntityManager();
348 EntityTransaction tx = null;
349 org.uddi.repl_v3.ReplicationConfiguration item = new org.uddi.repl_v3.ReplicationConfiguration();
350 try {
351 tx = em.getTransaction();
352 tx.begin();
353 Query q = em.createQuery("SELECT item FROM ReplicationConfiguration item order by item.serialNumber DESC");
354 q.setMaxResults(1);
355 ReplicationConfiguration results = (ReplicationConfiguration) q.getSingleResult();
356
357 if (results != null) {
358 MappingModelToApi.mapReplicationConfiguration(results, item);
359 }
360
361 tx.commit();
362 return item;
363 } catch (Exception ex) {
364
365
366 if (tx != null && tx.isActive()) {
367 tx.rollback();
368 }
369 } finally {
370 em.close();
371 }
372 return null;
373 }
374
375 @Deprecated
376 private Node getNode(String messageSender) {
377 EntityManager em = PersistenceManager.getEntityManager();
378 EntityTransaction tx = null;
379 try {
380 tx = em.getTransaction();
381 tx.begin();
382 Node api = new Node();
383 org.apache.juddi.model.Node find = em.find(org.apache.juddi.model.Node.class, messageSender);
384 if (find != null) {
385 MappingModelToApi.mapNode(find, api);
386 }
387 tx.commit();
388 return api;
389 } catch (Exception ex) {
390 log.error("error", ex);
391 if (tx != null && tx.isActive()) {
392 tx.rollback();
393 }
394 } finally {
395 em.close();
396 }
397 return null;
398 }
399 }