1/*2 * Copyright 2001-2008 The Apache Software Foundation.3 * 4 * Licensed under the Apache License, Version 2.0 (the "License");5 * you may not use this file except in compliance with the License.6 * You may obtain a copy of the License at7 * 8 * http://www.apache.org/licenses/LICENSE-2.09 * 10 * Unless required by applicable law or agreed to in writing, software11 * distributed under the License is distributed on an "AS IS" BASIS,12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13 * See the License for the specific language governing permissions and14 * limitations under the License.15 *16 */17package org.apache.juddi.api.impl;
1819import java.io.StringWriter;
20import java.io.UnsupportedEncodingException;
21import java.math.BigInteger;
22import java.rmi.RemoteException;
23import java.util.ArrayList;
24import java.util.HashMap;
25import java.util.HashSet;
26import java.util.Iterator;
27import java.util.List;
28import java.util.Map;
29import java.util.Queue;
30import java.util.Set;
31import java.util.Timer;
32import java.util.TimerTask;
33import java.util.concurrent.ConcurrentLinkedQueue;
34import javax.jws.WebParam;
35import javax.jws.WebResult;
36import javax.jws.WebService;
37import javax.jws.soap.SOAPBinding;
38import javax.persistence.EntityManager;
39import javax.persistence.EntityTransaction;
40import javax.persistence.Query;
41import javax.xml.bind.JAXB;
42import javax.xml.bind.annotation.XmlSeeAlso;
43import javax.xml.ws.BindingProvider;
44import org.apache.juddi.api.util.QueryStatus;
45import org.apache.juddi.api.util.ReplicationQuery;
46import org.apache.juddi.config.AppConfig;
47import org.apache.juddi.config.PersistenceManager;
48import org.apache.juddi.config.Property;
49import org.apache.juddi.mapping.MappingApiToModel;
50import org.apache.juddi.mapping.MappingModelToApi;
51import org.apache.juddi.model.BindingTemplate;
52import org.apache.juddi.model.BusinessEntity;
53import org.apache.juddi.model.BusinessService;
54import org.apache.juddi.model.Operator;
55import org.apache.juddi.model.PublisherAssertion;
56import org.apache.juddi.model.PublisherAssertionId;
57import org.apache.juddi.model.Tmodel;
58import org.apache.juddi.model.UddiEntity;
59import org.apache.juddi.replication.ReplicationNotifier;
60importstatic org.apache.juddi.replication.ReplicationNotifier.FetchEdges;
61import org.apache.juddi.v3.client.UDDIService;
62import org.apache.juddi.v3.client.cryptor.TransportSecurityHelper;
63import org.apache.juddi.v3.error.ErrorMessage;
64import org.apache.juddi.v3.error.FatalErrorException;
65import org.apache.juddi.v3.error.TransferNotAllowedException;
66import org.apache.juddi.validation.ValidateReplication;
67import org.uddi.custody_v3.TransferEntities;
68import org.uddi.repl_v3.ChangeRecord;
69import org.uddi.repl_v3.ChangeRecordAcknowledgement;
70import org.uddi.repl_v3.ChangeRecordIDType;
71import org.uddi.repl_v3.ChangeRecords;
72import org.uddi.repl_v3.CommunicationGraph.Edge;
73import org.uddi.repl_v3.DoPing;
74import org.uddi.repl_v3.GetChangeRecords;
75import org.uddi.repl_v3.HighWaterMarkVectorType;
76import org.uddi.repl_v3.NotifyChangeRecordsAvailable;
77import org.uddi.repl_v3.ReplicationConfiguration;
78import org.uddi.repl_v3.TransferCustody;
79import org.uddi.v3_service.DispositionReportFaultMessage;
80import org.uddi.v3_service.UDDIReplicationPortType;
8182/**83 * UDDI Replication defines four APIs. The first two presented here are used to84 * perform replication and issue notifications. The latter ancillary APIs85 * provide support for other aspects of UDDI Replication.86 * <ul>87 * <li>get_changeRecords</li>88 * <li>notify_changeRecordsAvailable</li>89 * <li>do_ping</li>90 * <li>get_highWaterMarks</li></ul>91 *92 * @author <a href="mailto:alexoree@apache.org">Alex O'Ree</a>93 */94 @WebService(serviceName = "UDDI_Replication_PortType", targetNamespace = "urn:uddi-org:api_v3_portType",
95 endpointInterface = "org.uddi.v3_service.UDDIReplicationPortType")
96 @XmlSeeAlso({
97 org.uddi.custody_v3.ObjectFactory.class,
98 org.uddi.repl_v3.ObjectFactory.class,
99 org.uddi.subr_v3.ObjectFactory.class,
100 org.uddi.api_v3.ObjectFactory.class,
101 org.uddi.vscache_v3.ObjectFactory.class,
102 org.uddi.vs_v3.ObjectFactory.class,
103 org.uddi.sub_v3.ObjectFactory.class,
104 org.w3._2000._09.xmldsig_.ObjectFactory.class,
105 org.uddi.policy_v3.ObjectFactory.class,
106 org.uddi.policy_v3_instanceparms.ObjectFactory.class107 })
108publicclassUDDIReplicationImplextendsAuthenticatedServiceimplements UDDIReplicationPortType {
109110staticvoid notifyConfigurationChange(ReplicationConfiguration oldConfig, ReplicationConfiguration newConfig, AuthenticatedService service) {
111112//if the config is different113 Set<String> oldnodes = getNodes(oldConfig);
114 Set<String> newNodes = getNodes(newConfig);
115116 Set<String> addedNodes = diffNodeList(oldnodes, newNodes);
117if (queue == null) {
118 queue = new ConcurrentLinkedQueue<NotifyChangeRecordsAvailable>();
119 }
120for (String s : addedNodes) {
121if (!s.equals(service.getNode())) {
122 logger.info("This node: " + service.getNode() + ". New replication node queue for synchronization: " + s);
123 HighWaterMarkVectorType highWaterMarkVectorType = new HighWaterMarkVectorType();
124 highWaterMarkVectorType.getHighWaterMark().add(new ChangeRecordIDType(s, 0L));
125 queue.add(new NotifyChangeRecordsAvailable(s, highWaterMarkVectorType));
126 }
127 }
128129 }
130131privatestatic Set<String> getNodes(ReplicationConfiguration oldConfig) {
132 Set<String> ret = new HashSet<String>();
133if (oldConfig == null) {
134return ret;
135 }
136for (org.uddi.repl_v3.Operator o : oldConfig.getOperator()) {
137 ret.add(o.getOperatorNodeID());
138 }
139if (oldConfig.getCommunicationGraph() != null) {
140 ret.addAll(oldConfig.getCommunicationGraph().getNode());
141 }
142return ret;
143 }
144145/**146 * returns items in "newNodes" that are not in "oldNodes"147 *148 * @param oldnodes149 * @param newNodes150 * @return151 */152privatestatic Set<String> diffNodeList(Set<String> oldnodes, Set<String> newNodes) {
153 Set<String> diff = new HashSet<String>();
154 Iterator<String> iterator = newNodes.iterator();
155while (iterator.hasNext()) {
156 String lhs = iterator.next();
157 Iterator<String> iterator1 = oldnodes.iterator();
158boolean found = false;
159while (iterator1.hasNext()) {
160 String rhs = iterator1.next();
161if (rhs.equalsIgnoreCase(lhs)) {
162 found = true;
163break;
164 }
165 }
166if (!found) {
167 diff.add(lhs);
168 }
169170 }
171return diff;
172 }
173174privateUDDIServiceCounter serviceCounter;
175176privatestaticPullTimerTask timer = null;
177privatelong startBuffer;
178privatelong interval;
179180privatestaticUDDIPublicationImpl pub = null;
181182publicUDDIReplicationImpl() {
183super();
184try {
185this.interval = AppConfig.getConfiguration().getLong(Property.JUDDI_REPLICATION_INTERVAL, 5000L);
186this.startBuffer = AppConfig.getConfiguration().getLong(Property.JUDDI_REPLICATION_START_BUFFER, 5000L);
187 } catch (Exception ex) {
188 logger.warn("Config error!", ex);
189 }
190191 serviceCounter = ServiceCounterLifecycleResource.getServiceCounter(UDDIReplicationImpl.class);
192 init();
193194 }
195196privatesynchronizedvoid init() {
197if (pub == null) {
198 pub = newUDDIPublicationImpl();
199 }
200if (queue == null) {
201 queue = new ConcurrentLinkedQueue<NotifyChangeRecordsAvailable>();
202 }
203 timer = newPullTimerTask();
204205 }
206207208/**209 * handles when a remote node tells me that there's an update(s)210 * available211 */212privateclassPullTimerTaskextends TimerTask {
213214private Timer timer = null;
215216publicPullTimerTask() {
217super();
218 timer = new Timer(true);
219 timer.scheduleAtFixedRate(this, startBuffer, interval);
220 }
221boolean firstrun = true;
222223 @Override
224publicvoid run() {
225if (firstrun) {
226 enqueueAllReceivingNodes();
227 firstrun = false;
228 }
229230if (!queue.isEmpty()) {
231 logger.info("Replication change puller thread started. Queue size: " + queue.size());
232 }
233//ok someone told me there's a change available234while (!queue.isEmpty()) {
235 NotifyChangeRecordsAvailable poll = queue.poll();
236if (poll != null && !poll.getNotifyingNode().equalsIgnoreCase(getNode())) {
237 UDDIReplicationPortType replicationClient = getReplicationClient(poll.getNotifyingNode());
238if (replicationClient == null) {
239 logger.fatal("unable to obtain a replication client to node " + poll);
240 } else {
241try {
242//get the high water marks for this node243//ok now get all the changes244245//done replace with last known record from the given node246//for (int xx = 0; xx < poll.getChangesAvailable().getHighWaterMark().size(); xx++) {247// logger.info("Node " + poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID()248// + " USN " + poll.getChangesAvailable().getHighWaterMark().get(xx).getOriginatingUSN());249//}250 Set<String> nodesHitThisCycle = new HashSet<String>();
251for (int xx = 0; xx < poll.getChangesAvailable().getHighWaterMark().size(); xx++) {
252int recordsreturned = 21;
253while (recordsreturned >= 20) {
254if (nodesHitThisCycle.contains(poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID())) {
255 logger.info("i've already hit the node " + poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID() + " this cycle, skipping");
256break;
257 }
258if (poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID().equalsIgnoreCase(getNode())) {
259 logger.info("ignoring updates that were generated here " + poll.getChangesAvailable().getHighWaterMark().get(xx).getOriginatingUSN() + " sent by " + poll.getNotifyingNode() + " this node is " + getNode());
260break;
261 }
262 nodesHitThisCycle.add(poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID());
263 GetChangeRecords body = new GetChangeRecords();
264 body.setRequestingNode(getNode());
265 body.setResponseLimitCount(BigInteger.valueOf(100L));
266267 body.setChangesAlreadySeen(getLastChangeRecordFrom(poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID()));
268 logger.info("fetching updates from " + poll.getNotifyingNode() + " since " + body.getChangesAlreadySeen().getHighWaterMark().get(0).getNodeID() + ":" + body.getChangesAlreadySeen().getHighWaterMark().get(0).getOriginatingUSN() + ", items still in the queue: " + queue.size());
269//JAXB.marshal(body, System.out);270 List<ChangeRecord> records
271 = replicationClient.getChangeRecords(body).getChangeRecord();
272//ok now we need to persist the change records273 logger.info("Change records retrieved from " + poll.getNotifyingNode() + ", " + records.size());
274for (int i = 0; i < records.size(); i++) {
275 logger.info("Change records retrieved " + records.get(i).getChangeID().getNodeID() + " USN " + records.get(i).getChangeID().getOriginatingUSN());
276 persistChangeRecord(records.get(i));
277 }
278 recordsreturned = records.size();
279 }
280 }
281 } catch (Exception ex) {
282 logger.error("Error caught fetching replication changes from " + poll + " @" + ((BindingProvider) replicationClient).getRequestContext().get(BindingProvider.ENDPOINT_ADDRESS_PROPERTY), ex);
283 }
284 }
285 } else {
286if (poll == null) {
287 logger.warn("strange, popped a null object");
288 } elseif (poll.getNotifyingNode().equalsIgnoreCase(getNode())) {
289 logger.warn("strange, popped an object from the queue but it was from myself. This probably indicates a configuration error! ignoring...first record: " + poll.getChangesAvailable().getHighWaterMark().get(0).getNodeID()+":" + poll.getChangesAvailable().getHighWaterMark().get(0).getOriginatingUSN());
290 }
291 }
292 }
293 }
294295 @Override
296publicboolean cancel() {
297 timer.cancel();
298returnsuper.cancel();
299 }
300301/**302 * someone told me there's a change available, we retrieved it303 * and are processing the changes locally.304 *305 * @param rec306 */307privatevoid persistChangeRecord(ChangeRecord rec) {
308if (rec == null) {
309return;
310 }
311 logger.debug("_______________________Remote change request " + rec.getChangeID().getNodeID() + ":" + rec.getChangeID().getOriginatingUSN());
312313if (rec.getChangeID().getNodeID().equalsIgnoreCase(getNode())) {
314 logger.info("Just received a change record that i created, ignoring....");
315return;
316 }
317 EntityManager em = PersistenceManager.getEntityManager();
318 EntityTransaction tx = em.getTransaction();
319 org.apache.juddi.model.ChangeRecord mapChangeRecord = null;
320/**321 * In nodes that support pre-bundled replication322 * responses, the recipient of the get_changeRecords323 * message MAY return more change records than requested324 * by the caller. In this scenario, the caller MUST also325 * be prepared to deal with such redundant changes where326 * a USN is less than the USN specified in the327 * changesAlreadySeen highWaterMarkVector.328 */329330try {
331 tx.begin();
332//check to see if we have this update already333 Query createQuery = em.createQuery("select c from ChangeRecord c where c.nodeID=:node and c.originatingUSN=:oid");
334 createQuery.setParameter("node", rec.getChangeID().getNodeID());
335 createQuery.setParameter("oid", rec.getChangeID().getOriginatingUSN());
336 Object existingrecord = null;
337try {
338 existingrecord = createQuery.getSingleResult();
339 } catch (Exception ex) {
340 logger.debug("error checking to see if change record exists already (expected failure)", ex);
341 }
342if (existingrecord != null) {
343 logger.info("I've already processed change record " + rec.getChangeID().getNodeID() + " " + rec.getChangeID().getOriginatingUSN());
344return;
345 }
346//if it didn't come from here and i haven't seen it yet347 ReplicationNotifier.EnqueueRetransmit(rec);
348//the remotechange record rec must also be persisted!!349 mapChangeRecord = MappingApiToModel.mapChangeRecord(rec);
350 mapChangeRecord.setId(null);
351 mapChangeRecord.setIsAppliedLocally(true);
352 em.persist(mapChangeRecord);
353 tx.commit();
354 logger.info("Remote CR saved, it was from " + mapChangeRecord.getNodeID() //this is the origin of the change355 + " USN:" + mapChangeRecord.getOriginatingUSN()
356 + " Type:" + mapChangeRecord.getRecordType().name()
357 + " Key:" + mapChangeRecord.getEntityKey()
358 + " Local id from sender:" + mapChangeRecord.getId());
359 tx = em.getTransaction();
360 tx.begin();
361//<editor-fold defaultstate="collapsed" desc="delete a record">362363if (rec.getChangeRecordDelete() != null) {
364if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBindingKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBindingKey())) {
365//delete a binding template366UddiEntity ue = em.find(BindingTemplate.class, rec.getChangeRecordDelete().getBindingKey());
367 validateNodeIdMisMatches(ue, getNode());
368 pub.deleteBinding(rec.getChangeRecordDelete().getBindingKey(), em);
369 }
370if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBusinessKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBusinessKey())) {
371//delete a business 372UddiEntity ue = em.find(BusinessEntity.class, rec.getChangeRecordDelete().getBusinessKey());
373 validateNodeIdMisMatches(ue, getNode());
374 pub.deleteBusiness(rec.getChangeRecordDelete().getBusinessKey(), em);
375 }
376if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getServiceKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getServiceKey())) {
377UddiEntity ue = em.find(BusinessService.class, rec.getChangeRecordDelete().getServiceKey());
378 validateNodeIdMisMatches(ue, getNode());
379//delete a service 380 pub.deleteService(rec.getChangeRecordDelete().getServiceKey(), em);
381 }
382if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getTModelKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getTModelKey())) {
383//delete a tmodel 384/**385 * The changeRecordDelete for a386 * tModel does not correspond to387 * any API described in this388 * specification and should only389 * appear in the replication390 * stream as the result of an391 * administrative function to392 * permanently remove a tModel.393 */394UddiEntity tm = em.find(Tmodel.class, rec.getChangeRecordDelete().getTModelKey());
395if (tm != null) {
396 validateNodeIdMisMatches(tm, getNode());
397 em.remove(tm);
398 } else {
399 logger.error("failed to adminstratively delete tmodel because it doesn't exist. " + rec.getChangeRecordDelete().getTModelKey());
400 }
401//pub.deleteTModel(rec.getChangeRecordDelete().getTModelKey(), em);402 }
403 }
404if (rec.getChangeRecordDeleteAssertion() != null && rec.getChangeRecordDeleteAssertion().getPublisherAssertion() != null) {
405//delete a pa template 406 pub.deletePublisherAssertion(rec.getChangeRecordDeleteAssertion(), em);
407 }
408409//</editor-fold>410//<editor-fold defaultstate="collapsed" desc="New Data">411if (rec.getChangeRecordNewData() != null) {
412413//The operationalInfo element MUST contain the operational information associated with the indicated new data.414if (rec.getChangeRecordNewData().getOperationalInfo() == null) {
415 logger.warn("Inbound replication data does not have the required OperationalInfo element and is NOT spec compliant. Data will be ignored");
416 } else {
417if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID() == null) {
418thrownew Exception("Inbound replication data is missiong node id! Change will not be applied");
419 }
420if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equalsIgnoreCase(getNode())) {
421 logger.warn("Inbound replication data is modifying locally owned data. This is not allowed, except for custody transfer");
422 }
423if (rec.getChangeRecordNewData().getBindingTemplate() != null) {
424//fetch the binding template if it exists already425//if it exists, 426// confirm the owning node, it shouldn't be the local node id, if it is, throw427// the owning node should be the same as it was before428429BusinessService model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewData().getBindingTemplate().getServiceKey());
430if (model == null) {
431 logger.error("Replication error, attempting to insert a binding where the service doesn't exist yet");
432 } else {
433 validateNodeIdMatches(rec.getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());
434435 org.apache.juddi.model.BindingTemplate bt = em.find(org.apache.juddi.model.BindingTemplate.class, rec.getChangeRecordNewData().getBindingTemplate().getBindingKey());
436if (bt != null) {
437//ValidateNodeIdMatches(node, bt.getNodeId());438 em.remove(bt);
439 }
440 bt = newBindingTemplate();
441 MappingApiToModel.mapBindingTemplate(rec.getChangeRecordNewData().getBindingTemplate(), bt, model);
442 MappingApiToModel.mapOperationalInfo(bt, rec.getChangeRecordNewData().getOperationalInfo());
443// MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());444 em.persist(bt);
445 }
446447 } elseif (rec.getChangeRecordNewData().getBusinessEntity() != null) {
448449BusinessEntity model = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessEntity().getBusinessKey());
450if (model != null) {
451//if the owner of the new data is me, and the update didn't originate from me452if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
453 && !model.getNodeId().equals(getNode())) {
454if (model.getIsTransferInProgress()) {
455//allow the transfer456 MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);
457 MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
458 MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());
459 model.setIsTransferInProgress(false);
460 em.merge(model);
461 } else {
462//block it, unexpected transfer463thrownew Exception("Unexpected entity transfer to to node " + getNode() + " from " + rec.getChangeID().getNodeID());
464 }
465466 } elseif (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
467 && model.getNodeId().equals(getNode())) {
468//if destination is here and it's staying here, then this is strange also469//someone else updated one of my records470thrownew Exception("unexpected modification of records that this server owns, " + model.getEntityKey());
471 } elseif (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
472 && model.getNodeId().equals(getNode())) {
473//this is also strange, destination is elsewhere however it's owned by me.474thrownew Exception("unexpected transfer from this node to elsewhere, possible that the key in question exists at two places prior to replication sync, " + model.getEntityKey());
475476 } elseif (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
477 && !model.getNodeId().equals(getNode())) {
478//changes on a remote node, for an existing item479 MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);
480 MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());
481 em.merge(model);
482483 }
484485 } else {
486 model = newBusinessEntity();
487 MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);
488 MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());
489 em.persist(model);
490 }
491 }
492if (rec.getChangeRecordNewData().getBusinessService() != null) {
493BusinessEntity find = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessService().getBusinessKey());
494if (find == null) {
495 logger.error("Replication error, attempting to insert a service where the business doesn't exist yet");
496 } else {
497498 org.apache.juddi.model.BusinessService model = null;
499 model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewData().getBusinessService().getServiceKey());
500if (model != null) {
501 validateNodeIdMatches(rec.getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());
502 em.remove(model);
503 }
504505 model = new org.apache.juddi.model.BusinessService();
506 MappingApiToModel.mapBusinessService(rec.getChangeRecordNewData().getBusinessService(), model, find);
507 MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
508 MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());
509510 em.persist(model);
511 }
512513 } elseif (rec.getChangeRecordNewData().getTModel() != null) {
514515Tmodel model = em.find(org.apache.juddi.model.Tmodel.class, rec.getChangeRecordNewData().getTModel().getTModelKey());
516if (model != null) {
517//in the case of a transfer518//if the new entity is being transfer to ME, accept and i didn't previously own it, but only if the local record is flagged as transferable519//meaning, only accept if i'm expecting a transfer520if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
521 && !model.getNodeId().equals(getNode())) {
522if (model.getIsTransferInProgress()) {
523//allow the transfer524 em.remove(model);
525 model = newTmodel();
526 MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);
527 MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
528 model.setIsTransferInProgress(false);
529 em.persist(model);
530 } else {
531//block it, unexpected transfer532thrownew Exception("Unexpected entity transfer to this node from " + rec.getChangeID().getNodeID());
533 }
534535 } elseif (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
536 && model.getNodeId().equals(getNode())) {
537//if destination is here and it's staying here, then this is strange also538//someone else updated one of my records539thrownew Exception("unexpected modification of records that this server owns, " + model.getEntityKey());
540 } elseif (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
541 && model.getNodeId().equals(getNode())) {
542//this is also strange, destination is elsewhere however it's owned by me.543thrownew Exception("unexpected transfer from this node to elsewhere, possible that the key in question exists at two places prior to replication sync, " + model.getEntityKey());
544545 } elseif (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
546 && !model.getNodeId().equals(getNode())) {
547//changes on a remote node, for an existing item548 em.remove(model);
549 model = newTmodel();
550 MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);
551552 MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
553554 em.persist(model);
555556 }
557 } else {
558 model = newTmodel();
559 MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);
560561 MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
562563 em.persist(model);
564 }
565 }
566567 }
568569 }
570//</editor-fold>571572// changeRecordNull no action needed573// changeRecordHide tmodel only574//<editor-fold defaultstate="collapsed" desc="hide tmodel">575if (rec.getChangeRecordHide() != null) {
576/*577 A changeRecordHide element corresponds to the behavior of hiding a tModel described in the delete_tModel in the Publish API section of this Specification. A tModel listed in a changeRecordHide should be marked as hidden, so that it is not returned in response to a find_tModel API call.578579 The changeRecordHide MUST contain a modified timestamp to allow multi-node registries to calculate consistent modifiedIncludingChildren timestamps as described in Section 3.8 operationalInfo Structure.580 */581 String key = rec.getChangeRecordHide().getTModelKey();
582 org.apache.juddi.model.Tmodel existing = em.find(org.apache.juddi.model.Tmodel.class, key);
583if (existing == null) {
584 logger.error("Unexpected delete/hide tmodel message received for non existing key " + key);
585 } else {
586//no one else can delete/hide my tmodel587 validateNodeIdMisMatches(existing, getNode());
588 existing.setDeleted(true);
589 existing.setModified(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime());
590 existing.setModifiedIncludingChildren(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime());
591 em.persist(existing);
592 }
593 }
594//</editor-fold>595596//<editor-fold defaultstate="collapsed" desc="changeRecordPublisherAssertion">597if (rec.getChangeRecordPublisherAssertion() != null) {
598599 logger.info("Repl CR Publisher Assertion");
600//TODO are publisher assertions owned by a given node?601PublisherAssertionId paid = newPublisherAssertionId(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getFromKey(), rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getToKey());
602 org.apache.juddi.model.PublisherAssertion model = em.find(org.apache.juddi.model.PublisherAssertion.class, paid);
603if (model != null) {
604 logger.info("Repl CR Publisher Assertion - Existing");
605606if (rec.getChangeRecordPublisherAssertion().isFromBusinessCheck()) {
607 model.setFromCheck("true");
608 } else {
609 model.setFromCheck("false");
610 }
611612if (rec.getChangeRecordPublisherAssertion().isToBusinessCheck()) {
613 model.setToCheck("true");
614 } else {
615 model.setToCheck("false");
616 }
617618 model.setKeyName(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getKeyedReference().getKeyName());
619 model.setKeyValue(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getKeyedReference().getKeyValue());
620 model.setTmodelKey(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getKeyedReference().getTModelKey());
621 model.setModified(rec.getChangeRecordPublisherAssertion().getModified().toGregorianCalendar().getTime());
622//model.setSignatures(MappingApiToModel.mapApiSignaturesToModelSignatures(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getSignature()));623if ("false".equalsIgnoreCase(model.getFromCheck())
624 && "false".equalsIgnoreCase(model.getToCheck())) {
625 logger.warn("!!!New publisher assertion is both false and false, strange. no need to save it then!");
626 em.remove(model);
627 }
628 em.merge(model);
629 } else {
630 logger.info("Repl CR Publisher Assertion - new PA");
631632 model = newPublisherAssertion();
633 MappingApiToModel.mapPublisherAssertion(rec.getChangeRecordPublisherAssertion().getPublisherAssertion(), model);
634 model.setBusinessEntityByFromKey(null);
635 model.setBusinessEntityByToKey(null);
636 model.setBusinessEntityByFromKey(em.find(BusinessEntity.class, rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getFromKey()));
637 model.setBusinessEntityByToKey(em.find(BusinessEntity.class, rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getToKey()));
638639if (rec.getChangeRecordPublisherAssertion().isFromBusinessCheck()) {
640 model.setFromCheck("true");
641 } else {
642 model.setFromCheck("false");
643 }
644645if (rec.getChangeRecordPublisherAssertion().isToBusinessCheck()) {
646 model.setToCheck("true");
647 } else {
648 model.setToCheck("false");
649 }
650 model.setModified(rec.getChangeRecordPublisherAssertion().getModified().toGregorianCalendar().getTime());
651 em.persist(model);
652 }
653 }
654//</editor-fold>655656if (rec.isAcknowledgementRequested()) {
657ChangeRecord posack = newChangeRecord();
658 posack.setChangeRecordAcknowledgement(new ChangeRecordAcknowledgement());
659 posack.getChangeRecordAcknowledgement().setAcknowledgedChange(rec.getChangeID());
660 posack.setAcknowledgementRequested(false);
661 ReplicationNotifier.enqueue(MappingApiToModel.mapChangeRecord(posack));
662 }
663if (rec.getChangeRecordNewDataConditional() != null) {
664665if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID() == null) {
666thrownew Exception("Inbound replication data is missiong node id!");
667 }
668669//The operationalInfo element MUST contain the operational information associated with the indicated new data.670if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo() == null) {
671 logger.warn("Inbound replication data does not have the required OperationalInfo element and is NOT spec compliant. Data will be ignored");
672 } else {
673if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBindingTemplate() != null) {
674//fetch the binding template if it exists already675//if it exists, 676// confirm the owning node, it shouldn't be the local node id, if it is, throw677// the owning node should be the same as it was before678679BusinessService model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBindingTemplate().getServiceKey());
680if (model == null) {
681 logger.error("Replication error, attempting to insert a binding where the service doesn't exist yet");
682 } else {
683684 org.apache.juddi.model.BindingTemplate bt = em.find(org.apache.juddi.model.BindingTemplate.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBindingTemplate().getBindingKey());
685if (bt != null) {
686 validateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), bt.getNodeId());
687688 em.remove(bt);
689 }
690 bt = newBindingTemplate();
691 MappingApiToModel.mapBindingTemplate(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBindingTemplate(), bt, model);
692 MappingApiToModel.mapOperationalInfo(bt, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
693// MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());694 em.persist(bt);
695 }
696697 } elseif (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessEntity() != null) {
698699BusinessEntity model = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessEntity().getBusinessKey());
700if (model != null) {
701 validateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());
702//TODO revisit access control rules703 em.remove(model);
704 }
705 model = newBusinessEntity();
706 MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessEntity(), model);
707// MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());708709 MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
710 logger.warn("Name size on save is " + model.getBusinessNames().size());
711 em.persist(model);
712713 }
714if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService() != null) {
715BusinessEntity find = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService().getBusinessKey());
716if (find == null) {
717 logger.error("Replication error, attempting to insert a service where the business doesn't exist yet");
718 } else {
719720 org.apache.juddi.model.BusinessService model = null;
721 model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService().getServiceKey());
722if (model != null) {
723 validateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());
724 em.remove(model);
725 }
726727 model = new org.apache.juddi.model.BusinessService();
728 MappingApiToModel.mapBusinessService(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService(), model, find);
729 MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
730 MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
731732 em.persist(model);
733 }
734735 } elseif (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getTModel() != null) {
736737Tmodel model = em.find(org.apache.juddi.model.Tmodel.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getTModel().getTModelKey());
738if (model != null) {
739 validateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());
740 em.remove(model);
741 }
742 model = newTmodel();
743 MappingApiToModel.mapTModel(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getTModel(), model);
744745 MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
746747 em.persist(model);
748 }
749750 }
751752 }
753if (rec.getChangeRecordNull() != null) {
754//No action required755756 }
757if (rec.getChangeRecordCorrection() != null) {
758//TODO implement759760 }
761if (rec.getChangeRecordConditionFailed() != null) {
762//TODO implement763764 }
765 tx.commit();
766767 } catch (Exception drfm) {
768769 logger.warn("Error applying the change record! ", drfm);
770 StringWriter sw = new StringWriter();
771 JAXB.marshal(rec, sw);
772 logger.warn("This is the record that failed to persist: " + sw.toString());
773if (tx.isActive()) {
774 tx.rollback();
775 }
776if (mapChangeRecord != null) {
777//set the change record's isApplied to false778try {
779 tx = em.getTransaction();
780 tx.begin();
781 mapChangeRecord.setIsAppliedLocally(false);
782 em.merge(mapChangeRecord);
783 tx.commit();
784 } catch (Exception e) {
785 logger.error("error updating change record!!", e);
786if (tx.isActive()) {
787 tx.rollback();
788 }
789 }
790 } else {
791 logger.fatal("whoa! change record is null when saving a remote change record, this is unexpected and should be reported");
792 }
793 } finally {
794if (tx.isActive()) {
795 tx.rollback();
796 }
797 em.close();
798 }
799 }
800801private HighWaterMarkVectorType getLastChangeRecordFrom(String sourcenode) {
802 HighWaterMarkVectorType ret = new HighWaterMarkVectorType();
803 ChangeRecordIDType cid = new ChangeRecordIDType();
804 cid.setNodeID(sourcenode);
805 cid.setOriginatingUSN(0L);
806 EntityManager em = PersistenceManager.getEntityManager();
807 EntityTransaction tx = em.getTransaction();
808try {
809 tx.begin();
810//Long id = 0L;811try {
812 cid.setOriginatingUSN((Long) em.createQuery("select MAX(e.originatingUSN) from ChangeRecord e where e.nodeID = :node")
813 .setParameter("node", sourcenode)
814 .getSingleResult());
815 } catch (Exception ex) {
816 logger.info("unexpected error searching for last record from " + sourcenode, ex);
817 }
818819 tx.rollback();
820821 } catch (Exception drfm) {
822 logger.warn("error caught fetching newest record from node " + sourcenode, drfm);
823 } finally {
824if (tx.isActive()) {
825 tx.rollback();
826 }
827 em.close();
828 }
829 logger.info("Highest known record for " + sourcenode + " is " + cid.getOriginatingUSN());
830 ret.getHighWaterMark().add(cid);
831832return ret;
833 }
834835privatevoid enqueueAllReceivingNodes() {
836if (queue == null) {
837 queue = new ConcurrentLinkedQueue<NotifyChangeRecordsAvailable>();
838 }
839//get the replication config840//get everyone we are expecting to receive data from, then enqueue them for pulling841ReplicationConfiguration repcfg = ReplicationNotifier.FetchEdges();
842if (repcfg == null) {
843return;
844 }
845 Set<String> allnodes = new HashSet<String>();
846for (int i = 0; i < repcfg.getOperator().size(); i++) {
847 allnodes.add(repcfg.getOperator().get(i).getOperatorNodeID());
848 }
849 Set<String> receivers = new HashSet<String>();
850if (repcfg.getCommunicationGraph() == null851 || repcfg.getCommunicationGraph().getEdge().isEmpty()) {
852//no edges or graph defined, default to the operator list853for (org.uddi.repl_v3.Operator o : repcfg.getOperator()) {
854//no need to tell myself about a change at myself855if (!o.getOperatorNodeID().equalsIgnoreCase(getNode())) {
856 receivers.add(o.getOperatorNodeID());
857 }
858 }
859 } else {
860//repcfg.getCommunicationGraph()861 Iterator<Edge> iterator = repcfg.getCommunicationGraph().getEdge().iterator();
862while (iterator.hasNext()) {
863Edge next = iterator.next();
864865if (next.getMessageReceiver().equalsIgnoreCase(getNode())) {
866 receivers.add(next.getMessageSender());
867 }
868869 }
870871 }
872for (String s : receivers) {
873//this is a list of nodes that this node is expecting updates from874//here are we ticking the notification engine to ping the remove service for updates875for (String nodeping : allnodes) {
876 queue.add(new NotifyChangeRecordsAvailable(s, getLastChangeRecordFrom(nodeping)));
877//for each node we are expecting data from, go fetch it, along the way, we'll request all data for all nodes878//that we know about879 }
880881 }
882 }
883884 }
885886/**887 * used to check for alterations on *this node's data from another node,888 * which isn't allowed889 *890 * @param ue891 * @param node892 * @throws Exception893 */894privatestaticvoid validateNodeIdMisMatches(UddiEntity ue, String node) throws Exception {
895if (ue == null) {
896return;//object doesn't exist897 }
898if (ue.getNodeId().equals(node)) {
899thrownew Exception("Alert! attempt to alter locally owned entity " + ue.getEntityKey() + " owned by " + ue.getAuthorizedName() + "@" + ue.getNodeId());
900 }
901 }
902903/**904 * use to validate that changed data maintained ownership, except for905 * business entities and tmodels since they allow transfer906 *907 * @param newNodeId908 * @param currentOwningNode909 * @throws Exception910 */911privatevoid validateNodeIdMatches(String newNodeId, String currentOwningNode) throws Exception {
912if (newNodeId == null || currentOwningNode == null) {
913thrownew Exception("either the local node ID is null or the inbound replication data's node id is null");
914 }
915//only time this is allowed is custody transfer916if (!newNodeId.equals(currentOwningNode)) {
917 logger.info("AUDIT, custody transfer from node, " + currentOwningNode + " to " + newNodeId + " current node is " + getNode());
918//throw new Exception("node id mismatch!");919 }
920921//if i already have a record and "own it" and the remote node has a record with the same key, reject the update922//1.5.8 923/**924 * Each node has custody of a portion of the aggregate data925 * managed by the registry of which it is a part. Each datum is926 * by definition in the custody of exactly one such node. A927 * datum in this context can be a businessEntity, a928 * businessService, a bindingTemplate, a tModel, or a929 * publisherAssertion. Changes to a datum in the registry MUST930 * originate at the node which is the custodian of the datum.931 * The registry defines the policy for data custody and, if932 * allowed, the custodian node for a given datum can be changed;933 * such custody transfer processes are discussed in Section 5.4934 * Custody and Ownership Transfer API.935 */936//so someone else attempted to update one of my records, reject it937if (newNodeId.equals(getNode())) {
938//throw new Exception("node id mismatch! this node already has a record for key " + newDataOperationalInfo.getEntityKey() + " and I'm the authority for it.");939 }
940 }
941942privatesynchronized UDDIReplicationPortType getReplicationClient(String node) {
943if (cache.containsKey(node)) {
944return cache.get(node);
945 }
946 UDDIService svc = new UDDIService();
947 UDDIReplicationPortType replicationClient = svc.getUDDIReplicationPort();
948 TransportSecurityHelper.applyTransportSecurity((BindingProvider) replicationClient);
949950 EntityManager em = PersistenceManager.getEntityManager();
951 EntityTransaction tx = em.getTransaction();
952try {
953 tx.begin();
954 StringBuilder sql = new StringBuilder();
955 sql.append("select c from ReplicationConfiguration c order by c.serialNumber desc");
956//sql.toString();957 Query qry = em.createQuery(sql.toString());
958 qry.setMaxResults(1);
959960 org.apache.juddi.model.ReplicationConfiguration resultList = (org.apache.juddi.model.ReplicationConfiguration) qry.getSingleResult();
961for (Operator o : resultList.getOperator()) {
962if (o.getOperatorNodeID().equalsIgnoreCase(node)) {
963 ((BindingProvider) replicationClient).getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, o.getSoapReplicationURL());
964 cache.put(node, replicationClient);
965return replicationClient;
966 }
967 }
968 tx.rollback();
969970 } catch (Exception ex) {
971 logger.fatal("Node not found!" + node, ex);
972 } finally {
973if (tx.isActive()) {
974 tx.rollback();
975 }
976 em.close();
977 }
978//em.close();979returnnull;
980981 }
982private Map<String, UDDIReplicationPortType> cache = new HashMap<String, UDDIReplicationPortType>();
983984/**985 * @since 3.3986 * @param body987 * @return988 * @throws DispositionReportFaultMessage989 */990public String doPing(DoPing body) throws DispositionReportFaultMessage {
991long startTime = System.currentTimeMillis();
992long procTime = System.currentTimeMillis() - startTime;
993 serviceCounter.update(ReplicationQuery.DO_PING, QueryStatus.SUCCESS, procTime);
994995return getNode();
996997 }
998999 @SOAPBinding(parameterStyle = SOAPBinding.ParameterStyle.BARE)
1000 @WebResult(name = "changeRecords", targetNamespace = "urn:uddi-org:repl_v3", partName = "body")
1001// @WebMethod(operationName = "get_changeRecords", action = "get_changeRecords")1002 @Override
1003public org.uddi.repl_v3.ChangeRecords getChangeRecords(
1004 @WebParam(partName = "body", name = "get_changeRecords", targetNamespace = "urn:uddi-org:repl_v3") org.uddi.repl_v3.GetChangeRecords body
1005 ) throws DispositionReportFaultMessage, RemoteException {
1006long startTime = System.currentTimeMillis();
1007 String requestingNode = body.getRequestingNode();
1008 HighWaterMarkVectorType changesAlreadySeen = body.getChangesAlreadySeen();
1009 BigInteger responseLimitCount = body.getResponseLimitCount();
1010 HighWaterMarkVectorType responseLimitVector = body.getResponseLimitVector();
10111012newValidateReplication(null).validateGetChangeRecords(requestingNode, changesAlreadySeen, responseLimitCount, responseLimitVector, FetchEdges(), ctx);
10131014//TODO should we validate that "requestingNode" is in the replication config?1015 List<ChangeRecord> ret = new ArrayList<ChangeRecord>();
1016 EntityManager em = PersistenceManager.getEntityManager();
1017 EntityTransaction tx = em.getTransaction();
10181019/**1020 * More specifically, the recipient determines the particular1021 * change records that are returned by comparing the originating1022 * USNs in the caller’s high water mark vector with the1023 * originating USNs of each of the changes the recipient has1024 * seen from others or generated by itself. The recipient SHOULD1025 * only return change records that have originating USNs that1026 * are greater than those listed in the changesAlreadySeen1027 * highWaterMarkVector and less than the limit required by1028 * either the responseLimitCount or the responseLimitVector.1029 *1030 *1031 * Part of the message is a high water mark vector that contains1032 * for each node of the registry the originating USN of the most1033 * recent change record that has been successfully processed by1034 * the invocating node1035 */1036try {
1037int maxrecords = AppConfig.getConfiguration().getInt(Property.JUDDI_REPLICATION_GET_CHANGE_RECORDS_MAX, 100);
1038if (responseLimitCount != null) {
1039 maxrecords = responseLimitCount.intValue();
1040 }
1041 tx.begin();
1042 Long firstrecord = 0L;
1043 Long lastrecord = null;
1044 Query createQuery = null;
1045//SELECT t0.id, t0.change_contents, t0.entity_key, t0.appliedlocal, t0.node_id, t0.orginating_usn, t0.record_type FROM j3_chg_record t0 WHERE (t0.id > NULL AND t0.node_id = ?) ORDER BY t0.id ASC 1046if (changesAlreadySeen != null) {
1047//this is basically a lower limit (i.e. the newest record that was processed by the requestor1048//therefore we want the oldest record stored locally to return to the requestor for processing1049for (int i = 0; i < changesAlreadySeen.getHighWaterMark().size(); i++) {
1050 firstrecord = changesAlreadySeen.getHighWaterMark().get(i).getOriginatingUSN();
1051if (firstrecord == null) {
1052 firstrecord = 0L;
1053 }
1054if (changesAlreadySeen.getHighWaterMark().get(i).getNodeID().equals(getNode())) {
1055//special case, search by database id1056 createQuery = em.createQuery("select e from ChangeRecord e where "1057 + "(e.id > :inbound AND e.nodeID = :node) "1058 + "order by e.id ASC");
10591060 } else {
1061 createQuery = em.createQuery("select e from ChangeRecord e where "1062 + "e.originatingUSN > :inbound AND e.nodeID = :node "1063 + "order by e.originatingUSN ASC");
1064 }
1065 logger.info("Query db for replication changes, lower index is " + (firstrecord) + " last index " + lastrecord + " record limit " + maxrecords);
1066 logger.info("This node is " + getNode() + ", request is for data originated from " + changesAlreadySeen.getHighWaterMark().get(i).getNodeID() + " and it's being sent back to " + requestingNode);
10671068 createQuery.setMaxResults(maxrecords);
1069 createQuery.setParameter("inbound", firstrecord);
1070 createQuery.setParameter("node", changesAlreadySeen.getHighWaterMark().get(i).getNodeID());
1071 List<org.apache.juddi.model.ChangeRecord> records = (List<org.apache.juddi.model.ChangeRecord>) createQuery.getResultList();
1072 logger.info(records.size() + " CR records returned from query");
1073for (int x = 0; x < records.size(); x++) {
1074ChangeRecord r = MappingModelToApi.mapChangeRecord(records.get(x));
1075//if (!Excluded(changesAlreadySeen, r)) {1076 ret.add(r);
1077//}10781079 }
1080 }
1081 } /*if (responseLimitVector != null) {1082 //using responseLimitVector, indicating for each node in the graph the first change originating there that he does not wish to be returned.1083 //upper limit basically1084 for (int i = 0; i < responseLimitVector.getHighWaterMark().size(); i++) {1085 //if (responseLimitVector.getHighWaterMark().get(i).getNodeID().equals(node)) {1086 lastrecord = responseLimitVector.getHighWaterMark().get(i).getOriginatingUSN();1087 //}1088 }1089 }*/else {
1090if (firstrecord == null) {
1091 firstrecord = 0L;
1092 }
1093//assume that they just want records that originated from here?1094 logger.info("Query db for replication changes, lower index is " + (firstrecord) + " last index " + lastrecord + " record limit " + maxrecords);
1095 logger.info("This node is " + getNode() + " requesting node " + requestingNode);
10961097if (lastrecord != null) {
1098 createQuery = em.createQuery("select e from ChangeRecord e where "1099 + "(e.id > :inbound AND e.nodeID = :node AND e.id < :lastrecord) "1100 + "order by e.id ASC");
1101 createQuery.setParameter("lastrecord", lastrecord);
1102 } else {
1103 createQuery = em.createQuery("select e from ChangeRecord e where "1104 + "(e.id > :inbound AND e.nodeID = :node) "1105 + "order by e.id ASC");
1106 }
1107 createQuery.setMaxResults(maxrecords);
1108 createQuery.setParameter("inbound", firstrecord);
1109 createQuery.setParameter("node", getNode());
11101111 List<org.apache.juddi.model.ChangeRecord> records = (List<org.apache.juddi.model.ChangeRecord>) createQuery.getResultList();
1112 logger.info(records.size() + " CR records returned from query");
1113for (int i = 0; i < records.size(); i++) {
1114ChangeRecord r = MappingModelToApi.mapChangeRecord(records.get(i));
1115//if (!Excluded(changesAlreadySeen, r)) {1116 ret.add(r);
1117//}11181119 }
1120 }
1121 tx.rollback();
1122long procTime = System.currentTimeMillis() - startTime;
1123 serviceCounter.update(ReplicationQuery.GET_CHANGERECORDS,
1124 QueryStatus.SUCCESS, procTime);
11251126 } catch (Exception ex) {
1127 logger.fatal("Error, this node is: " + getNode(), ex);
1128thrownewFatalErrorException(newErrorMessage("E_fatalError", ex.getMessage()));
11291130 } finally {
1131if (tx.isActive()) {
1132 tx.rollback();
1133 }
1134 em.close();
1135 }
1136 logger.info("Change records returned for " + requestingNode + ": " + ret.size());
1137//JAXB.marshal(ret, System.out);1138 ChangeRecords x = new ChangeRecords();
1139 x.getChangeRecord().addAll(ret);
1140//JAXB.marshal(x, System.out);1141return x;
1142 }
11431144/**1145 * This UDDI API message provides a means to obtain a list of1146 * highWaterMark element containing the highest known USN for all nodes1147 * in the replication graph. If there is no graph, we just return the1148 * local bits1149 *1150 * @return1151 * @throws DispositionReportFaultMessage1152 */1153 @Override
1154public List<ChangeRecordIDType> getHighWaterMarks()
1155throws DispositionReportFaultMessage {
1156long startTime = System.currentTimeMillis();
11571158 List<ChangeRecordIDType> ret = new ArrayList<ChangeRecordIDType>();
11591160//fetch from database the highest known watermark1161ReplicationConfiguration FetchEdges = FetchEdges();
11621163 EntityManager em = PersistenceManager.getEntityManager();
1164 EntityTransaction tx = em.getTransaction();
1165 HashMap<String, Long> map = new HashMap<String, Long>();
1166try {
1167 tx.begin();
1168if (FetchEdges != null) {
1169 Iterator<String> it = FetchEdges.getCommunicationGraph().getNode().iterator();
1170while (it.hasNext()) {
1171 String nextNode = it.next();
1172if (!nextNode.equals(getNode())) {
1173if (!map.containsKey(nextNode)) {
1174 Long id = 0L;
1175try {
1176 id = (Long) em.createQuery("select e.originatingUSN from ChangeRecord e where e.nodeID = :node order by e.originatingUSN desc").setParameter("node", nextNode).setMaxResults(1).getSingleResult();
1177 } catch (Exception ex) {
1178 logger.debug(ex);
1179 }
1180if (id == null) {
1181 id = 0L;
1182//per the spec1183 }
1184 map.put(nextNode, id);
11851186 }
1187 }
1188 }
1189 }
1190//dont forget this node1191 Long id = (Long) em.createQuery("select (e.id) from ChangeRecord e where e.nodeID = :node order by e.id desc")
1192 .setParameter("node", getNode()).setMaxResults(1).getSingleResult();
1193if (id == null) {
1194 id = 0L;
1195 }
1196 ChangeRecordIDType x = new ChangeRecordIDType();
1197 x.setNodeID(getNode());
1198 x.setOriginatingUSN(id);
1199 ret.add(x);
12001201 tx.rollback();
1202long procTime = System.currentTimeMillis() - startTime;
1203 serviceCounter.update(ReplicationQuery.GET_HIGHWATERMARKS, QueryStatus.SUCCESS, procTime);
12041205 } catch (Exception drfm) {
1206thrownewFatalErrorException(newErrorMessage("E_fatalError", drfm.getMessage()));
12071208 } finally {
1209if (tx.isActive()) {
1210 tx.rollback();
1211 }
1212 em.close();
1213 }
12141215 Iterator<Map.Entry<String, Long>> iterator = map.entrySet().iterator();
1216while (iterator.hasNext()) {
1217 Map.Entry<String, Long> next = iterator.next();
1218 ret.add(new ChangeRecordIDType(next.getKey(), next.getValue()));
1219 }
1220return ret;
1221 }
12221223/**1224 * this means that another node has a change and we need to pick up the1225 * change and apply it to our local database.1226 *1227 * @param body1228 * @throws DispositionReportFaultMessage1229 */1230 @Override
1231publicvoid notifyChangeRecordsAvailable(NotifyChangeRecordsAvailable body)
1232throws DispositionReportFaultMessage {
1233long startTime = System.currentTimeMillis();
12341235//some other node just told us there's new records available, call1236//getChangeRecords from the remote node asynch1237newValidateReplication(null).validateNotifyChangeRecordsAvailable(body, ctx);
12381239 logger.info(body.getNotifyingNode() + " just told me that there are change records available, enqueuing...size is " + queue.size() + " this node is " + getNode());
1240//if (!queue.contains(body.getNotifyingNode())) {1241 queue.add(body);
1242//}1243long procTime = System.currentTimeMillis() - startTime;
1244 serviceCounter.update(ReplicationQuery.NOTIFY_CHANGERECORDSAVAILABLE,
1245 QueryStatus.SUCCESS, procTime);
1246 }
1247privatestatic Queue<NotifyChangeRecordsAvailable> queue = null;
12481249/**1250 * transfers custody of an entity from node1/user1 to node2/user21251 *1252 * assume this node is node 2.1253 *1254 * user1 on node1 requests a transfer token. node 1 issues the token.1255 *1256 * user1 now has a transfer token for their stuff user now takes the1257 * token to node 2 and calls transferEntities1258 * <img src="http://www.uddi.org/pubs/uddi-v3.0.2-20041019_files/image086.gif">1259 *1260 * @param body1261 * @throws DispositionReportFaultMessage1262 */1263 @Override
1264publicvoid transferCustody(TransferCustody body)
1265throws DispositionReportFaultMessage {
1266long startTime = System.currentTimeMillis();
1267 EntityManager em = PersistenceManager.getEntityManager();
1268 EntityTransaction tx = em.getTransaction();
1269 logger.info("Inbound transfer request (via replication api, node to node");
1270try {
1271 tx.begin();
1272//*this node is transfering data to another node1273//ValidateReplication.unsupportedAPICall();1274//a remote node just told me to give up control of some of my entities12751276//EntityTransaction tx = em.getTransaction();1277//confirm i have a replication config1278boolean ok = false;
1279ReplicationConfiguration FetchEdges = ReplicationNotifier.FetchEdges();
1280if (FetchEdges != null) {
1281for (int i = 0; i < FetchEdges.getOperator().size(); i++) {
1282//confirm that the destination node is in the replication config1283if (FetchEdges.getOperator().get(i).getOperatorNodeID().equals(body.getTransferOperationalInfo().getNodeID())) {
1284 ok = true;
1285break;
1286 }
1287 }
1288 }
1289if (!ok) {
1290thrownewTransferNotAllowedException(newErrorMessage("E_transferNotAllowedUnknownNode"));
1291 }
12921293newValidateReplication(null).validateTransfer(em, body);
12941295 TransferEntities te = new TransferEntities();
1296 te.setKeyBag(body.getKeyBag());
1297 te.setTransferToken(body.getTransferToken());
1298 te.setAuthInfo(null);
1299//make the change1300//enqueue in replication notifier1301//discard the token1302 logger.debug("request validated, processing transfer");
1303 List<ChangeRecord> executeTransfer = newUDDICustodyTransferImpl().executeTransfer(te, em, body.getTransferOperationalInfo().getAuthorizedName(), body.getTransferOperationalInfo().getNodeID());
13041305for (ChangeRecord c : executeTransfer) {
1306try {
1307 c.setChangeID(new ChangeRecordIDType());
1308 c.getChangeID().setNodeID(getNode());
1309 c.getChangeID().setOriginatingUSN(null);
1310 ReplicationNotifier.enqueue(MappingApiToModel.mapChangeRecord(c));
1311 } catch (UnsupportedEncodingException ex) {
1312 logger.error("", ex);
1313 }
1314 }
1315/**1316 * The custodial node must verify that it has granted1317 * permission to transfer the entities identified and1318 * that this permission is still valid. This operation1319 * is comprised of two steps:1320 *1321 * 1. Verification that the transferToken was issued by1322 * it, that it has not expired, that it represents the1323 * authority to transfer no more and no less than those1324 * entities identified by the businessKey and tModelKey1325 * elements and that all these entities are still valid1326 * and not yet transferred. The transferToken is1327 * invalidated if any of these conditions are not met.1328 *1329 * 2. If the conditions above are met, the custodial1330 * node will prevent any further changes to the entities1331 * identified by the businessKey and tModelKey elements1332 * identified. The entity will remain in this state1333 * until the replication stream indicates it has been1334 * successfully processed via the replication stream.1335 * Upon successful verification of the custody transfer1336 * request by the custodial node, an empty message is1337 * returned by it indicating the success of the request1338 * and acknowledging the custody transfer. Following the1339 * issue of the empty message, the custodial node will1340 * submit into the replication stream a1341 * changeRecordNewData providing in the operationalInfo,1342 * the nodeID accepting custody of the datum and the1343 * authorizedName of the publisher accepting ownership.1344 * The acknowledgmentRequested attribute of this change1345 * record MUST be set to "true".1346 *1347 *1348 *1349 * Finally, the custodial node invalidates the1350 * transferToken in order to prevent additional calls of1351 * the transfer_entities API.1352 */1353 tx.commit();
1354long procTime = System.currentTimeMillis() - startTime;
1355 serviceCounter.update(ReplicationQuery.TRANSFER_CUSTODY,
1356 QueryStatus.SUCCESS, procTime);
1357 } catch (DispositionReportFaultMessage d) {
1358 logger.error("Unable to process node to node custody transfer ", d);
1359throw d;
1360 } finally {
1361if (em != null && em.isOpen()) {
1362 em.close();
1363 }
1364if (tx.isActive()) {
1365 tx.rollback();
1366 }
1367 }
1368 }
13691370 }