1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.juddi.api.impl;
18
19 import java.io.StringWriter;
20 import java.io.UnsupportedEncodingException;
21 import java.math.BigInteger;
22 import java.rmi.RemoteException;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Queue;
30 import java.util.Set;
31 import java.util.Timer;
32 import java.util.TimerTask;
33 import java.util.concurrent.ConcurrentLinkedQueue;
34 import javax.jws.WebParam;
35 import javax.jws.WebResult;
36 import javax.jws.WebService;
37 import javax.jws.soap.SOAPBinding;
38 import javax.persistence.EntityManager;
39 import javax.persistence.EntityTransaction;
40 import javax.persistence.Query;
41 import javax.xml.bind.JAXB;
42 import javax.xml.bind.annotation.XmlSeeAlso;
43 import javax.xml.ws.BindingProvider;
44 import org.apache.juddi.api.util.QueryStatus;
45 import org.apache.juddi.api.util.ReplicationQuery;
46 import org.apache.juddi.config.AppConfig;
47 import org.apache.juddi.config.PersistenceManager;
48 import org.apache.juddi.config.Property;
49 import org.apache.juddi.mapping.MappingApiToModel;
50 import org.apache.juddi.mapping.MappingModelToApi;
51 import org.apache.juddi.model.BindingTemplate;
52 import org.apache.juddi.model.BusinessEntity;
53 import org.apache.juddi.model.BusinessService;
54 import org.apache.juddi.model.Operator;
55 import org.apache.juddi.model.PublisherAssertion;
56 import org.apache.juddi.model.PublisherAssertionId;
57 import org.apache.juddi.model.Tmodel;
58 import org.apache.juddi.model.UddiEntity;
59 import org.apache.juddi.replication.ReplicationNotifier;
60 import static org.apache.juddi.replication.ReplicationNotifier.FetchEdges;
61 import org.apache.juddi.v3.client.UDDIService;
62 import org.apache.juddi.v3.client.cryptor.TransportSecurityHelper;
63 import org.apache.juddi.v3.error.ErrorMessage;
64 import org.apache.juddi.v3.error.FatalErrorException;
65 import org.apache.juddi.v3.error.TransferNotAllowedException;
66 import org.apache.juddi.validation.ValidateReplication;
67 import org.uddi.custody_v3.TransferEntities;
68 import org.uddi.repl_v3.ChangeRecord;
69 import org.uddi.repl_v3.ChangeRecordAcknowledgement;
70 import org.uddi.repl_v3.ChangeRecordIDType;
71 import org.uddi.repl_v3.ChangeRecords;
72 import org.uddi.repl_v3.CommunicationGraph.Edge;
73 import org.uddi.repl_v3.DoPing;
74 import org.uddi.repl_v3.GetChangeRecords;
75 import org.uddi.repl_v3.HighWaterMarkVectorType;
76 import org.uddi.repl_v3.NotifyChangeRecordsAvailable;
77 import org.uddi.repl_v3.ReplicationConfiguration;
78 import org.uddi.repl_v3.TransferCustody;
79 import org.uddi.v3_service.DispositionReportFaultMessage;
80 import org.uddi.v3_service.UDDIReplicationPortType;
81
82
83
84
85
86
87
88
89
90
91
92
93
94 @WebService(serviceName = "UDDI_Replication_PortType", targetNamespace = "urn:uddi-org:api_v3_portType",
95 endpointInterface = "org.uddi.v3_service.UDDIReplicationPortType")
96 @XmlSeeAlso({
97 org.uddi.custody_v3.ObjectFactory.class,
98 org.uddi.repl_v3.ObjectFactory.class,
99 org.uddi.subr_v3.ObjectFactory.class,
100 org.uddi.api_v3.ObjectFactory.class,
101 org.uddi.vscache_v3.ObjectFactory.class,
102 org.uddi.vs_v3.ObjectFactory.class,
103 org.uddi.sub_v3.ObjectFactory.class,
104 org.w3._2000._09.xmldsig_.ObjectFactory.class,
105 org.uddi.policy_v3.ObjectFactory.class,
106 org.uddi.policy_v3_instanceparms.ObjectFactory.class
107 })
108 public class UDDIReplicationImpl extends AuthenticatedService implements UDDIReplicationPortType {
109
110 static void notifyConfigurationChange(ReplicationConfiguration oldConfig, ReplicationConfiguration newConfig, AuthenticatedService service) {
111
112
113 Set<String> oldnodes = getNodes(oldConfig);
114 Set<String> newNodes = getNodes(newConfig);
115
116 Set<String> addedNodes = diffNodeList(oldnodes, newNodes);
117 if (queue == null) {
118 queue = new ConcurrentLinkedQueue<NotifyChangeRecordsAvailable>();
119 }
120 for (String s : addedNodes) {
121 if (!s.equals(service.getNode())) {
122 logger.info("This node: " + service.getNode() + ". New replication node queue for synchronization: " + s);
123 HighWaterMarkVectorType highWaterMarkVectorType = new HighWaterMarkVectorType();
124 highWaterMarkVectorType.getHighWaterMark().add(new ChangeRecordIDType(s, 0L));
125 queue.add(new NotifyChangeRecordsAvailable(s, highWaterMarkVectorType));
126 }
127 }
128
129 }
130
131 private static Set<String> getNodes(ReplicationConfiguration oldConfig) {
132 Set<String> ret = new HashSet<String>();
133 if (oldConfig == null) {
134 return ret;
135 }
136 for (org.uddi.repl_v3.Operator o : oldConfig.getOperator()) {
137 ret.add(o.getOperatorNodeID());
138 }
139 if (oldConfig.getCommunicationGraph() != null) {
140 ret.addAll(oldConfig.getCommunicationGraph().getNode());
141 }
142 return ret;
143 }
144
145
146
147
148
149
150
151
152 private static Set<String> diffNodeList(Set<String> oldnodes, Set<String> newNodes) {
153 Set<String> diff = new HashSet<String>();
154 Iterator<String> iterator = newNodes.iterator();
155 while (iterator.hasNext()) {
156 String lhs = iterator.next();
157 Iterator<String> iterator1 = oldnodes.iterator();
158 boolean found = false;
159 while (iterator1.hasNext()) {
160 String rhs = iterator1.next();
161 if (rhs.equalsIgnoreCase(lhs)) {
162 found = true;
163 break;
164 }
165 }
166 if (!found) {
167 diff.add(lhs);
168 }
169
170 }
171 return diff;
172 }
173
174 private UDDIServiceCounter serviceCounter;
175
176 private static PullTimerTask timer = null;
177 private long startBuffer;
178 private long interval;
179
180 private static UDDIPublicationImpl pub = null;
181
182 public UDDIReplicationImpl() {
183 super();
184 try {
185 this.interval = AppConfig.getConfiguration().getLong(Property.JUDDI_REPLICATION_INTERVAL, 5000L);
186 this.startBuffer = AppConfig.getConfiguration().getLong(Property.JUDDI_REPLICATION_START_BUFFER, 5000L);
187 } catch (Exception ex) {
188 logger.warn("Config error!", ex);
189 }
190
191 serviceCounter = ServiceCounterLifecycleResource.getServiceCounter(UDDIReplicationImpl.class);
192 init();
193
194 }
195
196 private synchronized void init() {
197 if (pub == null) {
198 pub = new UDDIPublicationImpl();
199 }
200 if (queue == null) {
201 queue = new ConcurrentLinkedQueue<NotifyChangeRecordsAvailable>();
202 }
203 timer = new PullTimerTask();
204
205 }
206
207
208
209
210
211
212 private class PullTimerTask extends TimerTask {
213
214 private Timer timer = null;
215
216 public PullTimerTask() {
217 super();
218 timer = new Timer(true);
219 timer.scheduleAtFixedRate(this, startBuffer, interval);
220 }
221 boolean firstrun = true;
222
223 @Override
224 public void run() {
225 if (firstrun) {
226 enqueueAllReceivingNodes();
227 firstrun = false;
228 }
229
230 if (!queue.isEmpty()) {
231 logger.info("Replication change puller thread started. Queue size: " + queue.size());
232 }
233
234 while (!queue.isEmpty()) {
235 NotifyChangeRecordsAvailable poll = queue.poll();
236 if (poll != null && !poll.getNotifyingNode().equalsIgnoreCase(getNode())) {
237 UDDIReplicationPortType replicationClient = getReplicationClient(poll.getNotifyingNode());
238 if (replicationClient == null) {
239 logger.fatal("unable to obtain a replication client to node " + poll);
240 } else {
241 try {
242
243
244
245
246
247
248
249
250 Set<String> nodesHitThisCycle = new HashSet<String>();
251 for (int xx = 0; xx < poll.getChangesAvailable().getHighWaterMark().size(); xx++) {
252 int recordsreturned = 21;
253 while (recordsreturned >= 20) {
254 if (nodesHitThisCycle.contains(poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID())) {
255 logger.info("i've already hit the node " + poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID() + " this cycle, skipping");
256 break;
257 }
258 if (poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID().equalsIgnoreCase(getNode())) {
259 logger.info("ignoring updates that were generated here " + poll.getChangesAvailable().getHighWaterMark().get(xx).getOriginatingUSN() + " sent by " + poll.getNotifyingNode() + " this node is " + getNode());
260 break;
261 }
262 nodesHitThisCycle.add(poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID());
263 GetChangeRecords body = new GetChangeRecords();
264 body.setRequestingNode(getNode());
265 body.setResponseLimitCount(BigInteger.valueOf(100L));
266
267 body.setChangesAlreadySeen(getLastChangeRecordFrom(poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID()));
268 logger.info("fetching updates from " + poll.getNotifyingNode() + " since " + body.getChangesAlreadySeen().getHighWaterMark().get(0).getNodeID() + ":" + body.getChangesAlreadySeen().getHighWaterMark().get(0).getOriginatingUSN() + ", items still in the queue: " + queue.size());
269
270 List<ChangeRecord> records
271 = replicationClient.getChangeRecords(body).getChangeRecord();
272
273 logger.info("Change records retrieved from " + poll.getNotifyingNode() + ", " + records.size());
274 for (int i = 0; i < records.size(); i++) {
275 logger.info("Change records retrieved " + records.get(i).getChangeID().getNodeID() + " USN " + records.get(i).getChangeID().getOriginatingUSN());
276 persistChangeRecord(records.get(i));
277 }
278 recordsreturned = records.size();
279 }
280 }
281 } catch (Exception ex) {
282 logger.error("Error caught fetching replication changes from " + poll + " @" + ((BindingProvider) replicationClient).getRequestContext().get(BindingProvider.ENDPOINT_ADDRESS_PROPERTY), ex);
283 }
284 }
285 } else {
286 if (poll == null) {
287 logger.warn("strange, popped a null object");
288 } else if (poll.getNotifyingNode().equalsIgnoreCase(getNode())) {
289 logger.warn("strange, popped an object from the queue but it was from myself. This probably indicates a configuration error! ignoring...first record: " + poll.getChangesAvailable().getHighWaterMark().get(0).getNodeID()+":" + poll.getChangesAvailable().getHighWaterMark().get(0).getOriginatingUSN());
290 }
291 }
292 }
293 }
294
295 @Override
296 public boolean cancel() {
297 timer.cancel();
298 return super.cancel();
299 }
300
301
302
303
304
305
306
307 private void persistChangeRecord(ChangeRecord rec) {
308 if (rec == null) {
309 return;
310 }
311 logger.debug("_______________________Remote change request " + rec.getChangeID().getNodeID() + ":" + rec.getChangeID().getOriginatingUSN());
312
313 if (rec.getChangeID().getNodeID().equalsIgnoreCase(getNode())) {
314 logger.info("Just received a change record that i created, ignoring....");
315 return;
316 }
317 EntityManager em = PersistenceManager.getEntityManager();
318 EntityTransaction tx = em.getTransaction();
319 org.apache.juddi.model.ChangeRecord mapChangeRecord = null;
320
321
322
323
324
325
326
327
328
329
330 try {
331 tx.begin();
332
333 Query createQuery = em.createQuery("select c from ChangeRecord c where c.nodeID=:node and c.originatingUSN=:oid");
334 createQuery.setParameter("node", rec.getChangeID().getNodeID());
335 createQuery.setParameter("oid", rec.getChangeID().getOriginatingUSN());
336 Object existingrecord = null;
337 try {
338 existingrecord = createQuery.getSingleResult();
339 } catch (Exception ex) {
340 logger.debug("error checking to see if change record exists already (expected failure)", ex);
341 }
342 if (existingrecord != null) {
343 logger.info("I've already processed change record " + rec.getChangeID().getNodeID() + " " + rec.getChangeID().getOriginatingUSN());
344 return;
345 }
346
347 ReplicationNotifier.EnqueueRetransmit(rec);
348
349 mapChangeRecord = MappingApiToModel.mapChangeRecord(rec);
350 mapChangeRecord.setId(null);
351 mapChangeRecord.setIsAppliedLocally(true);
352 em.persist(mapChangeRecord);
353 tx.commit();
354 logger.info("Remote CR saved, it was from " + mapChangeRecord.getNodeID()
355 + " USN:" + mapChangeRecord.getOriginatingUSN()
356 + " Type:" + mapChangeRecord.getRecordType().name()
357 + " Key:" + mapChangeRecord.getEntityKey()
358 + " Local id from sender:" + mapChangeRecord.getId());
359 tx = em.getTransaction();
360 tx.begin();
361
362
363 if (rec.getChangeRecordDelete() != null) {
364 if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBindingKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBindingKey())) {
365
366 UddiEntity ue = em.find(BindingTemplate.class, rec.getChangeRecordDelete().getBindingKey());
367 validateNodeIdMisMatches(ue, getNode());
368 pub.deleteBinding(rec.getChangeRecordDelete().getBindingKey(), em);
369 }
370 if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBusinessKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBusinessKey())) {
371
372 UddiEntity ue = em.find(BusinessEntity.class, rec.getChangeRecordDelete().getBusinessKey());
373 validateNodeIdMisMatches(ue, getNode());
374 pub.deleteBusiness(rec.getChangeRecordDelete().getBusinessKey(), em);
375 }
376 if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getServiceKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getServiceKey())) {
377 UddiEntity ue = em.find(BusinessService.class, rec.getChangeRecordDelete().getServiceKey());
378 validateNodeIdMisMatches(ue, getNode());
379
380 pub.deleteService(rec.getChangeRecordDelete().getServiceKey(), em);
381 }
382 if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getTModelKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getTModelKey())) {
383
384
385
386
387
388
389
390
391
392
393
394 UddiEntity tm = em.find(Tmodel.class, rec.getChangeRecordDelete().getTModelKey());
395 if (tm != null) {
396 validateNodeIdMisMatches(tm, getNode());
397 em.remove(tm);
398 } else {
399 logger.error("failed to adminstratively delete tmodel because it doesn't exist. " + rec.getChangeRecordDelete().getTModelKey());
400 }
401
402 }
403 }
404 if (rec.getChangeRecordDeleteAssertion() != null && rec.getChangeRecordDeleteAssertion().getPublisherAssertion() != null) {
405
406 pub.deletePublisherAssertion(rec.getChangeRecordDeleteAssertion(), em);
407 }
408
409
410
411 if (rec.getChangeRecordNewData() != null) {
412
413
414 if (rec.getChangeRecordNewData().getOperationalInfo() == null) {
415 logger.warn("Inbound replication data does not have the required OperationalInfo element and is NOT spec compliant. Data will be ignored");
416 } else {
417 if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID() == null) {
418 throw new Exception("Inbound replication data is missiong node id! Change will not be applied");
419 }
420 if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equalsIgnoreCase(getNode())) {
421 logger.warn("Inbound replication data is modifying locally owned data. This is not allowed, except for custody transfer");
422 }
423 if (rec.getChangeRecordNewData().getBindingTemplate() != null) {
424
425
426
427
428
429 BusinessService model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewData().getBindingTemplate().getServiceKey());
430 if (model == null) {
431 logger.error("Replication error, attempting to insert a binding where the service doesn't exist yet");
432 } else {
433 validateNodeIdMatches(rec.getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());
434
435 org.apache.juddi.model.BindingTemplate bt = em.find(org.apache.juddi.model.BindingTemplate.class, rec.getChangeRecordNewData().getBindingTemplate().getBindingKey());
436 if (bt != null) {
437
438 em.remove(bt);
439 }
440 bt = new BindingTemplate();
441 MappingApiToModel.mapBindingTemplate(rec.getChangeRecordNewData().getBindingTemplate(), bt, model);
442 MappingApiToModel.mapOperationalInfo(bt, rec.getChangeRecordNewData().getOperationalInfo());
443
444 em.persist(bt);
445 }
446
447 } else if (rec.getChangeRecordNewData().getBusinessEntity() != null) {
448
449 BusinessEntity model = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessEntity().getBusinessKey());
450 if (model != null) {
451
452 if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
453 && !model.getNodeId().equals(getNode())) {
454 if (model.getIsTransferInProgress()) {
455
456 MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);
457 MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
458 MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());
459 model.setIsTransferInProgress(false);
460 em.merge(model);
461 } else {
462
463 throw new Exception("Unexpected entity transfer to to node " + getNode() + " from " + rec.getChangeID().getNodeID());
464 }
465
466 } else if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
467 && model.getNodeId().equals(getNode())) {
468
469
470 throw new Exception("unexpected modification of records that this server owns, " + model.getEntityKey());
471 } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
472 && model.getNodeId().equals(getNode())) {
473
474 throw new Exception("unexpected transfer from this node to elsewhere, possible that the key in question exists at two places prior to replication sync, " + model.getEntityKey());
475
476 } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
477 && !model.getNodeId().equals(getNode())) {
478
479 MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);
480 MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());
481 em.merge(model);
482
483 }
484
485 } else {
486 model = new BusinessEntity();
487 MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);
488 MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());
489 em.persist(model);
490 }
491 }
492 if (rec.getChangeRecordNewData().getBusinessService() != null) {
493 BusinessEntity find = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessService().getBusinessKey());
494 if (find == null) {
495 logger.error("Replication error, attempting to insert a service where the business doesn't exist yet");
496 } else {
497
498 org.apache.juddi.model.BusinessService model = null;
499 model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewData().getBusinessService().getServiceKey());
500 if (model != null) {
501 validateNodeIdMatches(rec.getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());
502 em.remove(model);
503 }
504
505 model = new org.apache.juddi.model.BusinessService();
506 MappingApiToModel.mapBusinessService(rec.getChangeRecordNewData().getBusinessService(), model, find);
507 MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
508 MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());
509
510 em.persist(model);
511 }
512
513 } else if (rec.getChangeRecordNewData().getTModel() != null) {
514
515 Tmodel model = em.find(org.apache.juddi.model.Tmodel.class, rec.getChangeRecordNewData().getTModel().getTModelKey());
516 if (model != null) {
517
518
519
520 if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
521 && !model.getNodeId().equals(getNode())) {
522 if (model.getIsTransferInProgress()) {
523
524 em.remove(model);
525 model = new Tmodel();
526 MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);
527 MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
528 model.setIsTransferInProgress(false);
529 em.persist(model);
530 } else {
531
532 throw new Exception("Unexpected entity transfer to this node from " + rec.getChangeID().getNodeID());
533 }
534
535 } else if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
536 && model.getNodeId().equals(getNode())) {
537
538
539 throw new Exception("unexpected modification of records that this server owns, " + model.getEntityKey());
540 } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
541 && model.getNodeId().equals(getNode())) {
542
543 throw new Exception("unexpected transfer from this node to elsewhere, possible that the key in question exists at two places prior to replication sync, " + model.getEntityKey());
544
545 } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
546 && !model.getNodeId().equals(getNode())) {
547
548 em.remove(model);
549 model = new Tmodel();
550 MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);
551
552 MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
553
554 em.persist(model);
555
556 }
557 } else {
558 model = new Tmodel();
559 MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);
560
561 MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
562
563 em.persist(model);
564 }
565 }
566
567 }
568
569 }
570
571
572
573
574
575 if (rec.getChangeRecordHide() != null) {
576
577
578
579
580
581 String key = rec.getChangeRecordHide().getTModelKey();
582 org.apache.juddi.model.Tmodel existing = em.find(org.apache.juddi.model.Tmodel.class, key);
583 if (existing == null) {
584 logger.error("Unexpected delete/hide tmodel message received for non existing key " + key);
585 } else {
586
587 validateNodeIdMisMatches(existing, getNode());
588 existing.setDeleted(true);
589 existing.setModified(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime());
590 existing.setModifiedIncludingChildren(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime());
591 em.persist(existing);
592 }
593 }
594
595
596
597 if (rec.getChangeRecordPublisherAssertion() != null) {
598
599 logger.info("Repl CR Publisher Assertion");
600
601 PublisherAssertionId paid = new PublisherAssertionId(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getFromKey(), rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getToKey());
602 org.apache.juddi.model.PublisherAssertion model = em.find(org.apache.juddi.model.PublisherAssertion.class, paid);
603 if (model != null) {
604 logger.info("Repl CR Publisher Assertion - Existing");
605
606 if (rec.getChangeRecordPublisherAssertion().isFromBusinessCheck()) {
607 model.setFromCheck("true");
608 } else {
609 model.setFromCheck("false");
610 }
611
612 if (rec.getChangeRecordPublisherAssertion().isToBusinessCheck()) {
613 model.setToCheck("true");
614 } else {
615 model.setToCheck("false");
616 }
617
618 model.setKeyName(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getKeyedReference().getKeyName());
619 model.setKeyValue(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getKeyedReference().getKeyValue());
620 model.setTmodelKey(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getKeyedReference().getTModelKey());
621 model.setModified(rec.getChangeRecordPublisherAssertion().getModified().toGregorianCalendar().getTime());
622
623 if ("false".equalsIgnoreCase(model.getFromCheck())
624 && "false".equalsIgnoreCase(model.getToCheck())) {
625 logger.warn("!!!New publisher assertion is both false and false, strange. no need to save it then!");
626 em.remove(model);
627 }
628 em.merge(model);
629 } else {
630 logger.info("Repl CR Publisher Assertion - new PA");
631
632 model = new PublisherAssertion();
633 MappingApiToModel.mapPublisherAssertion(rec.getChangeRecordPublisherAssertion().getPublisherAssertion(), model);
634 model.setBusinessEntityByFromKey(null);
635 model.setBusinessEntityByToKey(null);
636 model.setBusinessEntityByFromKey(em.find(BusinessEntity.class, rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getFromKey()));
637 model.setBusinessEntityByToKey(em.find(BusinessEntity.class, rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getToKey()));
638
639 if (rec.getChangeRecordPublisherAssertion().isFromBusinessCheck()) {
640 model.setFromCheck("true");
641 } else {
642 model.setFromCheck("false");
643 }
644
645 if (rec.getChangeRecordPublisherAssertion().isToBusinessCheck()) {
646 model.setToCheck("true");
647 } else {
648 model.setToCheck("false");
649 }
650 model.setModified(rec.getChangeRecordPublisherAssertion().getModified().toGregorianCalendar().getTime());
651 em.persist(model);
652 }
653 }
654
655
656 if (rec.isAcknowledgementRequested()) {
657 ChangeRecord posack = new ChangeRecord();
658 posack.setChangeRecordAcknowledgement(new ChangeRecordAcknowledgement());
659 posack.getChangeRecordAcknowledgement().setAcknowledgedChange(rec.getChangeID());
660 posack.setAcknowledgementRequested(false);
661 ReplicationNotifier.enqueue(MappingApiToModel.mapChangeRecord(posack));
662 }
663 if (rec.getChangeRecordNewDataConditional() != null) {
664
665 if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID() == null) {
666 throw new Exception("Inbound replication data is missiong node id!");
667 }
668
669
670 if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo() == null) {
671 logger.warn("Inbound replication data does not have the required OperationalInfo element and is NOT spec compliant. Data will be ignored");
672 } else {
673 if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBindingTemplate() != null) {
674
675
676
677
678
679 BusinessService model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBindingTemplate().getServiceKey());
680 if (model == null) {
681 logger.error("Replication error, attempting to insert a binding where the service doesn't exist yet");
682 } else {
683
684 org.apache.juddi.model.BindingTemplate bt = em.find(org.apache.juddi.model.BindingTemplate.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBindingTemplate().getBindingKey());
685 if (bt != null) {
686 validateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), bt.getNodeId());
687
688 em.remove(bt);
689 }
690 bt = new BindingTemplate();
691 MappingApiToModel.mapBindingTemplate(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBindingTemplate(), bt, model);
692 MappingApiToModel.mapOperationalInfo(bt, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
693
694 em.persist(bt);
695 }
696
697 } else if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessEntity() != null) {
698
699 BusinessEntity model = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessEntity().getBusinessKey());
700 if (model != null) {
701 validateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());
702
703 em.remove(model);
704 }
705 model = new BusinessEntity();
706 MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessEntity(), model);
707
708
709 MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
710 logger.warn("Name size on save is " + model.getBusinessNames().size());
711 em.persist(model);
712
713 }
714 if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService() != null) {
715 BusinessEntity find = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService().getBusinessKey());
716 if (find == null) {
717 logger.error("Replication error, attempting to insert a service where the business doesn't exist yet");
718 } else {
719
720 org.apache.juddi.model.BusinessService model = null;
721 model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService().getServiceKey());
722 if (model != null) {
723 validateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());
724 em.remove(model);
725 }
726
727 model = new org.apache.juddi.model.BusinessService();
728 MappingApiToModel.mapBusinessService(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService(), model, find);
729 MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
730 MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
731
732 em.persist(model);
733 }
734
735 } else if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getTModel() != null) {
736
737 Tmodel model = em.find(org.apache.juddi.model.Tmodel.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getTModel().getTModelKey());
738 if (model != null) {
739 validateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());
740 em.remove(model);
741 }
742 model = new Tmodel();
743 MappingApiToModel.mapTModel(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getTModel(), model);
744
745 MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
746
747 em.persist(model);
748 }
749
750 }
751
752 }
753 if (rec.getChangeRecordNull() != null) {
754
755
756 }
757 if (rec.getChangeRecordCorrection() != null) {
758
759
760 }
761 if (rec.getChangeRecordConditionFailed() != null) {
762
763
764 }
765 tx.commit();
766
767 } catch (Exception drfm) {
768
769 logger.warn("Error applying the change record! ", drfm);
770 StringWriter sw = new StringWriter();
771 JAXB.marshal(rec, sw);
772 logger.warn("This is the record that failed to persist: " + sw.toString());
773 if (tx.isActive()) {
774 tx.rollback();
775 }
776 if (mapChangeRecord != null) {
777
778 try {
779 tx = em.getTransaction();
780 tx.begin();
781 mapChangeRecord.setIsAppliedLocally(false);
782 em.merge(mapChangeRecord);
783 tx.commit();
784 } catch (Exception e) {
785 logger.error("error updating change record!!", e);
786 if (tx.isActive()) {
787 tx.rollback();
788 }
789 }
790 } else {
791 logger.fatal("whoa! change record is null when saving a remote change record, this is unexpected and should be reported");
792 }
793 } finally {
794 if (tx.isActive()) {
795 tx.rollback();
796 }
797 em.close();
798 }
799 }
800
801 private HighWaterMarkVectorType getLastChangeRecordFrom(String sourcenode) {
802 HighWaterMarkVectorType ret = new HighWaterMarkVectorType();
803 ChangeRecordIDType cid = new ChangeRecordIDType();
804 cid.setNodeID(sourcenode);
805 cid.setOriginatingUSN(0L);
806 EntityManager em = PersistenceManager.getEntityManager();
807 EntityTransaction tx = em.getTransaction();
808 try {
809 tx.begin();
810
811 try {
812 cid.setOriginatingUSN((Long) em.createQuery("select MAX(e.originatingUSN) from ChangeRecord e where e.nodeID = :node")
813 .setParameter("node", sourcenode)
814 .getSingleResult());
815 } catch (Exception ex) {
816 logger.info("unexpected error searching for last record from " + sourcenode, ex);
817 }
818
819 tx.rollback();
820
821 } catch (Exception drfm) {
822 logger.warn("error caught fetching newest record from node " + sourcenode, drfm);
823 } finally {
824 if (tx.isActive()) {
825 tx.rollback();
826 }
827 em.close();
828 }
829 logger.info("Highest known record for " + sourcenode + " is " + cid.getOriginatingUSN());
830 ret.getHighWaterMark().add(cid);
831
832 return ret;
833 }
834
835 private void enqueueAllReceivingNodes() {
836 if (queue == null) {
837 queue = new ConcurrentLinkedQueue<NotifyChangeRecordsAvailable>();
838 }
839
840
841 ReplicationConfiguration repcfg = ReplicationNotifier.FetchEdges();
842 if (repcfg == null) {
843 return;
844 }
845 Set<String> allnodes = new HashSet<String>();
846 for (int i = 0; i < repcfg.getOperator().size(); i++) {
847 allnodes.add(repcfg.getOperator().get(i).getOperatorNodeID());
848 }
849 Set<String> receivers = new HashSet<String>();
850 if (repcfg.getCommunicationGraph() == null
851 || repcfg.getCommunicationGraph().getEdge().isEmpty()) {
852
853 for (org.uddi.repl_v3.Operator o : repcfg.getOperator()) {
854
855 if (!o.getOperatorNodeID().equalsIgnoreCase(getNode())) {
856 receivers.add(o.getOperatorNodeID());
857 }
858 }
859 } else {
860
861 Iterator<Edge> iterator = repcfg.getCommunicationGraph().getEdge().iterator();
862 while (iterator.hasNext()) {
863 Edge next = iterator.next();
864
865 if (next.getMessageReceiver().equalsIgnoreCase(getNode())) {
866 receivers.add(next.getMessageSender());
867 }
868
869 }
870
871 }
872 for (String s : receivers) {
873
874
875 for (String nodeping : allnodes) {
876 queue.add(new NotifyChangeRecordsAvailable(s, getLastChangeRecordFrom(nodeping)));
877
878
879 }
880
881 }
882 }
883
884 }
885
886
887
888
889
890
891
892
893
894 private static void validateNodeIdMisMatches(UddiEntity ue, String node) throws Exception {
895 if (ue == null) {
896 return;
897 }
898 if (ue.getNodeId().equals(node)) {
899 throw new Exception("Alert! attempt to alter locally owned entity " + ue.getEntityKey() + " owned by " + ue.getAuthorizedName() + "@" + ue.getNodeId());
900 }
901 }
902
903
904
905
906
907
908
909
910
911 private void validateNodeIdMatches(String newNodeId, String currentOwningNode) throws Exception {
912 if (newNodeId == null || currentOwningNode == null) {
913 throw new Exception("either the local node ID is null or the inbound replication data's node id is null");
914 }
915
916 if (!newNodeId.equals(currentOwningNode)) {
917 logger.info("AUDIT, custody transfer from node, " + currentOwningNode + " to " + newNodeId + " current node is " + getNode());
918
919 }
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937 if (newNodeId.equals(getNode())) {
938
939 }
940 }
941
942 private synchronized UDDIReplicationPortType getReplicationClient(String node) {
943 if (cache.containsKey(node)) {
944 return cache.get(node);
945 }
946 UDDIService svc = new UDDIService();
947 UDDIReplicationPortType replicationClient = svc.getUDDIReplicationPort();
948 TransportSecurityHelper.applyTransportSecurity((BindingProvider) replicationClient);
949
950 EntityManager em = PersistenceManager.getEntityManager();
951 EntityTransaction tx = em.getTransaction();
952 try {
953 tx.begin();
954 StringBuilder sql = new StringBuilder();
955 sql.append("select c from ReplicationConfiguration c order by c.serialNumber desc");
956
957 Query qry = em.createQuery(sql.toString());
958 qry.setMaxResults(1);
959
960 org.apache.juddi.model.ReplicationConfiguration resultList = (org.apache.juddi.model.ReplicationConfiguration) qry.getSingleResult();
961 for (Operator o : resultList.getOperator()) {
962 if (o.getOperatorNodeID().equalsIgnoreCase(node)) {
963 ((BindingProvider) replicationClient).getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, o.getSoapReplicationURL());
964 cache.put(node, replicationClient);
965 return replicationClient;
966 }
967 }
968 tx.rollback();
969
970 } catch (Exception ex) {
971 logger.fatal("Node not found!" + node, ex);
972 } finally {
973 if (tx.isActive()) {
974 tx.rollback();
975 }
976 em.close();
977 }
978
979 return null;
980
981 }
982 private Map<String, UDDIReplicationPortType> cache = new HashMap<String, UDDIReplicationPortType>();
983
984
985
986
987
988
989
990 public String doPing(DoPing body) throws DispositionReportFaultMessage {
991 long startTime = System.currentTimeMillis();
992 long procTime = System.currentTimeMillis() - startTime;
993 serviceCounter.update(ReplicationQuery.DO_PING, QueryStatus.SUCCESS, procTime);
994
995 return getNode();
996
997 }
998
999 @SOAPBinding(parameterStyle = SOAPBinding.ParameterStyle.BARE)
1000 @WebResult(name = "changeRecords", targetNamespace = "urn:uddi-org:repl_v3", partName = "body")
1001
1002 @Override
1003 public org.uddi.repl_v3.ChangeRecords getChangeRecords(
1004 @WebParam(partName = "body", name = "get_changeRecords", targetNamespace = "urn:uddi-org:repl_v3") org.uddi.repl_v3.GetChangeRecords body
1005 ) throws DispositionReportFaultMessage, RemoteException {
1006 long startTime = System.currentTimeMillis();
1007 String requestingNode = body.getRequestingNode();
1008 HighWaterMarkVectorType changesAlreadySeen = body.getChangesAlreadySeen();
1009 BigInteger responseLimitCount = body.getResponseLimitCount();
1010 HighWaterMarkVectorType responseLimitVector = body.getResponseLimitVector();
1011
1012 new ValidateReplication(null).validateGetChangeRecords(requestingNode, changesAlreadySeen, responseLimitCount, responseLimitVector, FetchEdges(), ctx);
1013
1014
1015 List<ChangeRecord> ret = new ArrayList<ChangeRecord>();
1016 EntityManager em = PersistenceManager.getEntityManager();
1017 EntityTransaction tx = em.getTransaction();
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036 try {
1037 int maxrecords = AppConfig.getConfiguration().getInt(Property.JUDDI_REPLICATION_GET_CHANGE_RECORDS_MAX, 100);
1038 if (responseLimitCount != null) {
1039 maxrecords = responseLimitCount.intValue();
1040 }
1041 tx.begin();
1042 Long firstrecord = 0L;
1043 Long lastrecord = null;
1044 Query createQuery = null;
1045
1046 if (changesAlreadySeen != null) {
1047
1048
1049 for (int i = 0; i < changesAlreadySeen.getHighWaterMark().size(); i++) {
1050 firstrecord = changesAlreadySeen.getHighWaterMark().get(i).getOriginatingUSN();
1051 if (firstrecord == null) {
1052 firstrecord = 0L;
1053 }
1054 if (changesAlreadySeen.getHighWaterMark().get(i).getNodeID().equals(getNode())) {
1055
1056 createQuery = em.createQuery("select e from ChangeRecord e where "
1057 + "(e.id > :inbound AND e.nodeID = :node) "
1058 + "order by e.id ASC");
1059
1060 } else {
1061 createQuery = em.createQuery("select e from ChangeRecord e where "
1062 + "e.originatingUSN > :inbound AND e.nodeID = :node "
1063 + "order by e.originatingUSN ASC");
1064 }
1065 logger.info("Query db for replication changes, lower index is " + (firstrecord) + " last index " + lastrecord + " record limit " + maxrecords);
1066 logger.info("This node is " + getNode() + ", request is for data originated from " + changesAlreadySeen.getHighWaterMark().get(i).getNodeID() + " and it's being sent back to " + requestingNode);
1067
1068 createQuery.setMaxResults(maxrecords);
1069 createQuery.setParameter("inbound", firstrecord);
1070 createQuery.setParameter("node", changesAlreadySeen.getHighWaterMark().get(i).getNodeID());
1071 List<org.apache.juddi.model.ChangeRecord> records = (List<org.apache.juddi.model.ChangeRecord>) createQuery.getResultList();
1072 logger.info(records.size() + " CR records returned from query");
1073 for (int x = 0; x < records.size(); x++) {
1074 ChangeRecord r = MappingModelToApi.mapChangeRecord(records.get(x));
1075
1076 ret.add(r);
1077
1078
1079 }
1080 }
1081 }
1082
1083
1084
1085
1086
1087
1088
1089 else {
1090 if (firstrecord == null) {
1091 firstrecord = 0L;
1092 }
1093
1094 logger.info("Query db for replication changes, lower index is " + (firstrecord) + " last index " + lastrecord + " record limit " + maxrecords);
1095 logger.info("This node is " + getNode() + " requesting node " + requestingNode);
1096
1097 if (lastrecord != null) {
1098 createQuery = em.createQuery("select e from ChangeRecord e where "
1099 + "(e.id > :inbound AND e.nodeID = :node AND e.id < :lastrecord) "
1100 + "order by e.id ASC");
1101 createQuery.setParameter("lastrecord", lastrecord);
1102 } else {
1103 createQuery = em.createQuery("select e from ChangeRecord e where "
1104 + "(e.id > :inbound AND e.nodeID = :node) "
1105 + "order by e.id ASC");
1106 }
1107 createQuery.setMaxResults(maxrecords);
1108 createQuery.setParameter("inbound", firstrecord);
1109 createQuery.setParameter("node", getNode());
1110
1111 List<org.apache.juddi.model.ChangeRecord> records = (List<org.apache.juddi.model.ChangeRecord>) createQuery.getResultList();
1112 logger.info(records.size() + " CR records returned from query");
1113 for (int i = 0; i < records.size(); i++) {
1114 ChangeRecord r = MappingModelToApi.mapChangeRecord(records.get(i));
1115
1116 ret.add(r);
1117
1118
1119 }
1120 }
1121 tx.rollback();
1122 long procTime = System.currentTimeMillis() - startTime;
1123 serviceCounter.update(ReplicationQuery.GET_CHANGERECORDS,
1124 QueryStatus.SUCCESS, procTime);
1125
1126 } catch (Exception ex) {
1127 logger.fatal("Error, this node is: " + getNode(), ex);
1128 throw new FatalErrorException(new ErrorMessage("E_fatalError", ex.getMessage()));
1129
1130 } finally {
1131 if (tx.isActive()) {
1132 tx.rollback();
1133 }
1134 em.close();
1135 }
1136 logger.info("Change records returned for " + requestingNode + ": " + ret.size());
1137
1138 ChangeRecords x = new ChangeRecords();
1139 x.getChangeRecord().addAll(ret);
1140
1141 return x;
1142 }
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153 @Override
1154 public List<ChangeRecordIDType> getHighWaterMarks()
1155 throws DispositionReportFaultMessage {
1156 long startTime = System.currentTimeMillis();
1157
1158 List<ChangeRecordIDType> ret = new ArrayList<ChangeRecordIDType>();
1159
1160
1161 ReplicationConfiguration FetchEdges = FetchEdges();
1162
1163 EntityManager em = PersistenceManager.getEntityManager();
1164 EntityTransaction tx = em.getTransaction();
1165 HashMap<String, Long> map = new HashMap<String, Long>();
1166 try {
1167 tx.begin();
1168 if (FetchEdges != null) {
1169 Iterator<String> it = FetchEdges.getCommunicationGraph().getNode().iterator();
1170 while (it.hasNext()) {
1171 String nextNode = it.next();
1172 if (!nextNode.equals(getNode())) {
1173 if (!map.containsKey(nextNode)) {
1174 Long id = 0L;
1175 try {
1176 id = (Long) em.createQuery("select e.originatingUSN from ChangeRecord e where e.nodeID = :node order by e.originatingUSN desc").setParameter("node", nextNode).setMaxResults(1).getSingleResult();
1177 } catch (Exception ex) {
1178 logger.debug(ex);
1179 }
1180 if (id == null) {
1181 id = 0L;
1182
1183 }
1184 map.put(nextNode, id);
1185
1186 }
1187 }
1188 }
1189 }
1190
1191 Long id = (Long) em.createQuery("select (e.id) from ChangeRecord e where e.nodeID = :node order by e.id desc")
1192 .setParameter("node", getNode()).setMaxResults(1).getSingleResult();
1193 if (id == null) {
1194 id = 0L;
1195 }
1196 ChangeRecordIDType x = new ChangeRecordIDType();
1197 x.setNodeID(getNode());
1198 x.setOriginatingUSN(id);
1199 ret.add(x);
1200
1201 tx.rollback();
1202 long procTime = System.currentTimeMillis() - startTime;
1203 serviceCounter.update(ReplicationQuery.GET_HIGHWATERMARKS, QueryStatus.SUCCESS, procTime);
1204
1205 } catch (Exception drfm) {
1206 throw new FatalErrorException(new ErrorMessage("E_fatalError", drfm.getMessage()));
1207
1208 } finally {
1209 if (tx.isActive()) {
1210 tx.rollback();
1211 }
1212 em.close();
1213 }
1214
1215 Iterator<Map.Entry<String, Long>> iterator = map.entrySet().iterator();
1216 while (iterator.hasNext()) {
1217 Map.Entry<String, Long> next = iterator.next();
1218 ret.add(new ChangeRecordIDType(next.getKey(), next.getValue()));
1219 }
1220 return ret;
1221 }
1222
1223
1224
1225
1226
1227
1228
1229
1230 @Override
1231 public void notifyChangeRecordsAvailable(NotifyChangeRecordsAvailable body)
1232 throws DispositionReportFaultMessage {
1233 long startTime = System.currentTimeMillis();
1234
1235
1236
1237 new ValidateReplication(null).validateNotifyChangeRecordsAvailable(body, ctx);
1238
1239 logger.info(body.getNotifyingNode() + " just told me that there are change records available, enqueuing...size is " + queue.size() + " this node is " + getNode());
1240
1241 queue.add(body);
1242
1243 long procTime = System.currentTimeMillis() - startTime;
1244 serviceCounter.update(ReplicationQuery.NOTIFY_CHANGERECORDSAVAILABLE,
1245 QueryStatus.SUCCESS, procTime);
1246 }
1247 private static Queue<NotifyChangeRecordsAvailable> queue = null;
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263 @Override
1264 public void transferCustody(TransferCustody body)
1265 throws DispositionReportFaultMessage {
1266 long startTime = System.currentTimeMillis();
1267 EntityManager em = PersistenceManager.getEntityManager();
1268 EntityTransaction tx = em.getTransaction();
1269 logger.info("Inbound transfer request (via replication api, node to node");
1270 try {
1271 tx.begin();
1272
1273
1274
1275
1276
1277
1278 boolean ok = false;
1279 ReplicationConfiguration FetchEdges = ReplicationNotifier.FetchEdges();
1280 if (FetchEdges != null) {
1281 for (int i = 0; i < FetchEdges.getOperator().size(); i++) {
1282
1283 if (FetchEdges.getOperator().get(i).getOperatorNodeID().equals(body.getTransferOperationalInfo().getNodeID())) {
1284 ok = true;
1285 break;
1286 }
1287 }
1288 }
1289 if (!ok) {
1290 throw new TransferNotAllowedException(new ErrorMessage("E_transferNotAllowedUnknownNode"));
1291 }
1292
1293 new ValidateReplication(null).validateTransfer(em, body);
1294
1295 TransferEntities te = new TransferEntities();
1296 te.setKeyBag(body.getKeyBag());
1297 te.setTransferToken(body.getTransferToken());
1298 te.setAuthInfo(null);
1299
1300
1301
1302 logger.debug("request validated, processing transfer");
1303 List<ChangeRecord> executeTransfer = new UDDICustodyTransferImpl().executeTransfer(te, em, body.getTransferOperationalInfo().getAuthorizedName(), body.getTransferOperationalInfo().getNodeID());
1304
1305 for (ChangeRecord c : executeTransfer) {
1306 try {
1307 c.setChangeID(new ChangeRecordIDType());
1308 c.getChangeID().setNodeID(getNode());
1309 c.getChangeID().setOriginatingUSN(null);
1310 ReplicationNotifier.enqueue(MappingApiToModel.mapChangeRecord(c));
1311 } catch (UnsupportedEncodingException ex) {
1312 logger.error("", ex);
1313 }
1314 }
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353 tx.commit();
1354 long procTime = System.currentTimeMillis() - startTime;
1355 serviceCounter.update(ReplicationQuery.TRANSFER_CUSTODY,
1356 QueryStatus.SUCCESS, procTime);
1357 } catch (DispositionReportFaultMessage d) {
1358 logger.error("Unable to process node to node custody transfer ", d);
1359 throw d;
1360 } finally {
1361 if (em != null && em.isOpen()) {
1362 em.close();
1363 }
1364 if (tx.isActive()) {
1365 tx.rollback();
1366 }
1367 }
1368 }
1369
1370 }