This project has retired. For details please refer to its Attic page.
UDDIReplicationImpl xref
View Javadoc
1   /*
2    * Copyright 2001-2008 The Apache Software Foundation.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  package org.apache.juddi.api.impl;
18  
19  import java.io.StringWriter;
20  import java.io.UnsupportedEncodingException;
21  import java.math.BigInteger;
22  import java.rmi.RemoteException;
23  import java.util.ArrayList;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Queue;
30  import java.util.Set;
31  import java.util.Timer;
32  import java.util.TimerTask;
33  import java.util.concurrent.ConcurrentLinkedQueue;
34  import javax.jws.WebParam;
35  import javax.jws.WebResult;
36  import javax.jws.WebService;
37  import javax.jws.soap.SOAPBinding;
38  import javax.persistence.EntityManager;
39  import javax.persistence.EntityTransaction;
40  import javax.persistence.Query;
41  import javax.xml.bind.JAXB;
42  import javax.xml.bind.annotation.XmlSeeAlso;
43  import javax.xml.ws.BindingProvider;
44  import org.apache.juddi.api.util.QueryStatus;
45  import org.apache.juddi.api.util.ReplicationQuery;
46  import org.apache.juddi.config.AppConfig;
47  import org.apache.juddi.config.PersistenceManager;
48  import org.apache.juddi.config.Property;
49  import org.apache.juddi.mapping.MappingApiToModel;
50  import org.apache.juddi.mapping.MappingModelToApi;
51  import org.apache.juddi.model.BindingTemplate;
52  import org.apache.juddi.model.BusinessEntity;
53  import org.apache.juddi.model.BusinessService;
54  import org.apache.juddi.model.Operator;
55  import org.apache.juddi.model.PublisherAssertion;
56  import org.apache.juddi.model.PublisherAssertionId;
57  import org.apache.juddi.model.Tmodel;
58  import org.apache.juddi.model.UddiEntity;
59  import org.apache.juddi.replication.ReplicationNotifier;
60  import static org.apache.juddi.replication.ReplicationNotifier.FetchEdges;
61  import org.apache.juddi.v3.client.UDDIService;
62  import org.apache.juddi.v3.client.cryptor.TransportSecurityHelper;
63  import org.apache.juddi.v3.error.ErrorMessage;
64  import org.apache.juddi.v3.error.FatalErrorException;
65  import org.apache.juddi.v3.error.TransferNotAllowedException;
66  import org.apache.juddi.validation.ValidateReplication;
67  import org.uddi.custody_v3.TransferEntities;
68  import org.uddi.repl_v3.ChangeRecord;
69  import org.uddi.repl_v3.ChangeRecordAcknowledgement;
70  import org.uddi.repl_v3.ChangeRecordIDType;
71  import org.uddi.repl_v3.ChangeRecords;
72  import org.uddi.repl_v3.CommunicationGraph.Edge;
73  import org.uddi.repl_v3.DoPing;
74  import org.uddi.repl_v3.GetChangeRecords;
75  import org.uddi.repl_v3.HighWaterMarkVectorType;
76  import org.uddi.repl_v3.NotifyChangeRecordsAvailable;
77  import org.uddi.repl_v3.ReplicationConfiguration;
78  import org.uddi.repl_v3.TransferCustody;
79  import org.uddi.v3_service.DispositionReportFaultMessage;
80  import org.uddi.v3_service.UDDIReplicationPortType;
81  
82  /**
83   * UDDI Replication defines four APIs. The first two presented here are used to
84   * perform replication and issue notifications. The latter ancillary APIs
85   * provide support for other aspects of UDDI Replication.
86   * <ul>
87   * <li>get_changeRecords</li>
88   * <li>notify_changeRecordsAvailable</li>
89   * <li>do_ping</li>
90   * <li>get_highWaterMarks</li></ul>
91   *
92   * @author <a href="mailto:alexoree@apache.org">Alex O'Ree</a>
93   */
94  @WebService(serviceName = "UDDI_Replication_PortType", targetNamespace = "urn:uddi-org:api_v3_portType",
95          endpointInterface = "org.uddi.v3_service.UDDIReplicationPortType")
96  @XmlSeeAlso({
97          org.uddi.custody_v3.ObjectFactory.class,
98          org.uddi.repl_v3.ObjectFactory.class,
99          org.uddi.subr_v3.ObjectFactory.class,
100         org.uddi.api_v3.ObjectFactory.class,
101         org.uddi.vscache_v3.ObjectFactory.class,
102         org.uddi.vs_v3.ObjectFactory.class,
103         org.uddi.sub_v3.ObjectFactory.class,
104         org.w3._2000._09.xmldsig_.ObjectFactory.class,
105         org.uddi.policy_v3.ObjectFactory.class,
106         org.uddi.policy_v3_instanceparms.ObjectFactory.class
107 })
108 public class UDDIReplicationImpl extends AuthenticatedService implements UDDIReplicationPortType {
109 
110         static void notifyConfigurationChange(ReplicationConfiguration oldConfig, ReplicationConfiguration newConfig, AuthenticatedService service) {
111 
112                 //if the config is different
113                 Set<String> oldnodes = getNodes(oldConfig);
114                 Set<String> newNodes = getNodes(newConfig);
115 
116                 Set<String> addedNodes = diffNodeList(oldnodes, newNodes);
117                 if (queue == null) {
118                         queue = new ConcurrentLinkedQueue<NotifyChangeRecordsAvailable>();
119                 }
120                 for (String s : addedNodes) {
121                         if (!s.equals(service.getNode())) {
122                                 logger.info("This node: " + service.getNode() + ". New replication node queue for synchronization: " + s);
123                                 HighWaterMarkVectorType highWaterMarkVectorType = new HighWaterMarkVectorType();
124                                 highWaterMarkVectorType.getHighWaterMark().add(new ChangeRecordIDType(s, 0L));
125                                 queue.add(new NotifyChangeRecordsAvailable(s, highWaterMarkVectorType));
126                         }
127                 }
128 
129         }
130 
131         private static Set<String> getNodes(ReplicationConfiguration oldConfig) {
132                 Set<String> ret = new HashSet<String>();
133                 if (oldConfig == null) {
134                         return ret;
135                 }
136                 for (org.uddi.repl_v3.Operator o : oldConfig.getOperator()) {
137                         ret.add(o.getOperatorNodeID());
138                 }
139                 if (oldConfig.getCommunicationGraph() != null) {
140                         ret.addAll(oldConfig.getCommunicationGraph().getNode());
141                 }
142                 return ret;
143         }
144 
145         /**
146          * returns items in "newNodes" that are not in "oldNodes"
147          *
148          * @param oldnodes
149          * @param newNodes
150          * @return
151          */
152         private static Set<String> diffNodeList(Set<String> oldnodes, Set<String> newNodes) {
153                 Set<String> diff = new HashSet<String>();
154                 Iterator<String> iterator = newNodes.iterator();
155                 while (iterator.hasNext()) {
156                         String lhs = iterator.next();
157                         Iterator<String> iterator1 = oldnodes.iterator();
158                         boolean found = false;
159                         while (iterator1.hasNext()) {
160                                 String rhs = iterator1.next();
161                                 if (rhs.equalsIgnoreCase(lhs)) {
162                                         found = true;
163                                         break;
164                                 }
165                         }
166                         if (!found) {
167                                 diff.add(lhs);
168                         }
169 
170                 }
171                 return diff;
172         }
173 
174         private UDDIServiceCounter serviceCounter;
175 
176         private static PullTimerTask timer = null;
177         private long startBuffer;
178         private long interval;
179 
180         private static UDDIPublicationImpl pub = null;
181 
182         public UDDIReplicationImpl() {
183                 super();
184                 try {
185                         this.interval = AppConfig.getConfiguration().getLong(Property.JUDDI_REPLICATION_INTERVAL, 5000L);
186                         this.startBuffer = AppConfig.getConfiguration().getLong(Property.JUDDI_REPLICATION_START_BUFFER, 5000L);
187                 } catch (Exception ex) {
188                         logger.warn("Config error!", ex);
189                 }
190                
191                 serviceCounter = ServiceCounterLifecycleResource.getServiceCounter(UDDIReplicationImpl.class);
192                 init();
193 
194         }
195 
196         private synchronized void init() {
197                 if (pub == null) {
198                         pub = new UDDIPublicationImpl();
199                 }
200                 if (queue == null) {
201                         queue = new ConcurrentLinkedQueue<NotifyChangeRecordsAvailable>();
202                 }
203                 timer = new PullTimerTask();
204 
205         }
206 
207 
208         /**
209          * handles when a remote node tells me that there's an update(s)
210          * available
211          */
212         private class PullTimerTask extends TimerTask {
213 
214                 private Timer timer = null;
215 
216                 public PullTimerTask() {
217                         super();
218                         timer = new Timer(true);
219                         timer.scheduleAtFixedRate(this, startBuffer, interval);
220                 }
221                 boolean firstrun = true;
222 
223                 @Override
224                 public void run() {
225                         if (firstrun) {
226                                 enqueueAllReceivingNodes();
227                                 firstrun = false;
228                         }
229 
230                         if (!queue.isEmpty()) {
231                                 logger.info("Replication change puller thread started. Queue size: " + queue.size());
232                         }
233                         //ok someone told me there's a change available
234                         while (!queue.isEmpty()) {
235                                 NotifyChangeRecordsAvailable poll = queue.poll();
236                                 if (poll != null && !poll.getNotifyingNode().equalsIgnoreCase(getNode())) {
237                                         UDDIReplicationPortType replicationClient = getReplicationClient(poll.getNotifyingNode());
238                                         if (replicationClient == null) {
239                                                 logger.fatal("unable to obtain a replication client to node " + poll);
240                                         } else {
241                                                 try {
242                                                         //get the high water marks for this node
243                                                         //ok now get all the changes
244 
245                                                         //done  replace with last known record from the given node
246                                                         //for (int xx = 0; xx < poll.getChangesAvailable().getHighWaterMark().size(); xx++) {
247                                                         //        logger.info("Node " + poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID()
248                                                         //                + " USN " + poll.getChangesAvailable().getHighWaterMark().get(xx).getOriginatingUSN());
249                                                         //}
250                                                         Set<String> nodesHitThisCycle = new HashSet<String>();
251                                                         for (int xx = 0; xx < poll.getChangesAvailable().getHighWaterMark().size(); xx++) {
252                                                                 int recordsreturned = 21;
253                                                                 while (recordsreturned >= 20) {
254                                                                         if (nodesHitThisCycle.contains(poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID())) {
255                                                                                 logger.info("i've already hit the node " + poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID() + " this cycle, skipping");
256                                                                                 break;
257                                                                         }
258                                                                         if (poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID().equalsIgnoreCase(getNode())) {
259                                                                                 logger.info("ignoring updates that were generated here " + poll.getChangesAvailable().getHighWaterMark().get(xx).getOriginatingUSN() + " sent by " + poll.getNotifyingNode() + " this node is " + getNode());
260                                                                                 break;
261                                                                         }
262                                                                         nodesHitThisCycle.add(poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID());
263                                                                         GetChangeRecords body = new GetChangeRecords();
264                                                                         body.setRequestingNode(getNode());
265                                                                         body.setResponseLimitCount(BigInteger.valueOf(100L));
266 
267                                                                         body.setChangesAlreadySeen(getLastChangeRecordFrom(poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID()));
268                                                                         logger.info("fetching updates from " + poll.getNotifyingNode() + " since " + body.getChangesAlreadySeen().getHighWaterMark().get(0).getNodeID() + ":" + body.getChangesAlreadySeen().getHighWaterMark().get(0).getOriginatingUSN() + ", items still in the queue: " + queue.size());
269                                                                         //JAXB.marshal(body, System.out);
270                                                                         List<ChangeRecord> records
271                                                                                 = replicationClient.getChangeRecords(body).getChangeRecord();
272                                                                         //ok now we need to persist the change records
273                                                                         logger.info("Change records retrieved from " + poll.getNotifyingNode() + ", " + records.size());
274                                                                         for (int i = 0; i < records.size(); i++) {
275                                                                                 logger.info("Change records retrieved " + records.get(i).getChangeID().getNodeID() + " USN " + records.get(i).getChangeID().getOriginatingUSN());
276                                                                                 persistChangeRecord(records.get(i));
277                                                                         }
278                                                                         recordsreturned = records.size();
279                                                                 }
280                                                         }
281                                                 } catch (Exception ex) {
282                                                         logger.error("Error caught fetching replication changes from " + poll + " @" + ((BindingProvider) replicationClient).getRequestContext().get(BindingProvider.ENDPOINT_ADDRESS_PROPERTY), ex);
283                                                 }
284                                         }
285                                 } else {
286                                         if (poll == null) {
287                                                 logger.warn("strange, popped a null object");
288                                         } else if (poll.getNotifyingNode().equalsIgnoreCase(getNode())) {
289                                                 logger.warn("strange, popped an object from the queue but it was from myself. This probably indicates a configuration error! ignoring...first record: " + poll.getChangesAvailable().getHighWaterMark().get(0).getNodeID()+":" + poll.getChangesAvailable().getHighWaterMark().get(0).getOriginatingUSN());
290                                         }
291                                 }
292                         }
293                 }
294 
295                 @Override
296                 public boolean cancel() {
297                         timer.cancel();
298                         return super.cancel();
299                 }
300 
301                 /**
302                  * someone told me there's a change available, we retrieved it
303                  * and are processing the changes locally.
304                  *
305                  * @param rec
306                  */
307                 private void persistChangeRecord(ChangeRecord rec) {
308                         if (rec == null) {
309                                 return;
310                         }
311                         logger.debug("_______________________Remote change request " + rec.getChangeID().getNodeID() + ":" + rec.getChangeID().getOriginatingUSN());
312 
313                         if (rec.getChangeID().getNodeID().equalsIgnoreCase(getNode())) {
314                                 logger.info("Just received a change record that i created, ignoring....");
315                                 return;
316                         }
317                         EntityManager em = PersistenceManager.getEntityManager();
318                         EntityTransaction tx = em.getTransaction();
319                         org.apache.juddi.model.ChangeRecord mapChangeRecord = null;
320                         /**
321                          * In nodes that support pre-bundled replication
322                          * responses, the recipient of the get_changeRecords
323                          * message MAY return more change records than requested
324                          * by the caller. In this scenario, the caller MUST also
325                          * be prepared to deal with such redundant changes where
326                          * a USN is less than the USN specified in the
327                          * changesAlreadySeen highWaterMarkVector.
328                          */
329 
330                         try {
331                                 tx.begin();
332                                 //check to see if we have this update already
333                                 Query createQuery = em.createQuery("select c from ChangeRecord c where c.nodeID=:node and c.originatingUSN=:oid");
334                                 createQuery.setParameter("node", rec.getChangeID().getNodeID());
335                                 createQuery.setParameter("oid", rec.getChangeID().getOriginatingUSN());
336                                 Object existingrecord = null;
337                                 try {
338                                         existingrecord = createQuery.getSingleResult();
339                                 } catch (Exception ex) {
340                                         logger.debug("error checking to see if change record exists already (expected failure)", ex);
341                                 }
342                                 if (existingrecord != null) {
343                                         logger.info("I've already processed change record " + rec.getChangeID().getNodeID() + " " + rec.getChangeID().getOriginatingUSN());
344                                         return;
345                                 }
346                                 //if it didn't come from here and i haven't seen it yet
347                                 ReplicationNotifier.EnqueueRetransmit(rec);
348                                 //the remotechange record rec must also be persisted!!
349                                 mapChangeRecord = MappingApiToModel.mapChangeRecord(rec);
350                                 mapChangeRecord.setId(null);
351                                 mapChangeRecord.setIsAppliedLocally(true);
352                                 em.persist(mapChangeRecord);
353                                 tx.commit();
354                                 logger.info("Remote CR saved, it was from " + mapChangeRecord.getNodeID() //this is the origin of the change
355                                         + " USN:" + mapChangeRecord.getOriginatingUSN()
356                                         + " Type:" + mapChangeRecord.getRecordType().name()
357                                         + " Key:" + mapChangeRecord.getEntityKey()
358                                         + " Local id from sender:" + mapChangeRecord.getId());
359                                 tx = em.getTransaction();
360                                 tx.begin();
361                                 //<editor-fold defaultstate="collapsed" desc="delete a record">
362 
363                                 if (rec.getChangeRecordDelete() != null) {
364                                         if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBindingKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBindingKey())) {
365                                                 //delete a binding template
366                                                 UddiEntity ue = em.find(BindingTemplate.class, rec.getChangeRecordDelete().getBindingKey());
367                                                 validateNodeIdMisMatches(ue, getNode());
368                                                 pub.deleteBinding(rec.getChangeRecordDelete().getBindingKey(), em);
369                                         }
370                                         if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBusinessKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBusinessKey())) {
371                                                 //delete a business 
372                                                 UddiEntity ue = em.find(BusinessEntity.class, rec.getChangeRecordDelete().getBusinessKey());
373                                                 validateNodeIdMisMatches(ue, getNode());
374                                                 pub.deleteBusiness(rec.getChangeRecordDelete().getBusinessKey(), em);
375                                         }
376                                         if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getServiceKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getServiceKey())) {
377                                                 UddiEntity ue = em.find(BusinessService.class, rec.getChangeRecordDelete().getServiceKey());
378                                                 validateNodeIdMisMatches(ue, getNode());
379                                                 //delete a service 
380                                                 pub.deleteService(rec.getChangeRecordDelete().getServiceKey(), em);
381                                         }
382                                         if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getTModelKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getTModelKey())) {
383                                                 //delete a tmodel 
384                                                 /**
385                                                  * The changeRecordDelete for a
386                                                  * tModel does not correspond to
387                                                  * any API described in this
388                                                  * specification and should only
389                                                  * appear in the replication
390                                                  * stream as the result of an
391                                                  * administrative function to
392                                                  * permanently remove a tModel.
393                                                  */
394                                                 UddiEntity tm = em.find(Tmodel.class, rec.getChangeRecordDelete().getTModelKey());
395                                                 if (tm != null) {
396                                                         validateNodeIdMisMatches(tm, getNode());
397                                                         em.remove(tm);
398                                                 } else {
399                                                         logger.error("failed to adminstratively delete tmodel because it doesn't exist. " + rec.getChangeRecordDelete().getTModelKey());
400                                                 }
401                                                 //pub.deleteTModel(rec.getChangeRecordDelete().getTModelKey(), em);
402                                         }
403                                 }
404                                 if (rec.getChangeRecordDeleteAssertion() != null && rec.getChangeRecordDeleteAssertion().getPublisherAssertion() != null) {
405                                         //delete a pa template                            
406                                         pub.deletePublisherAssertion(rec.getChangeRecordDeleteAssertion(), em);
407                                 }
408 
409 //</editor-fold>
410                                 //<editor-fold defaultstate="collapsed" desc="New Data">
411                                 if (rec.getChangeRecordNewData() != null) {
412 
413                                         //The operationalInfo element MUST contain the operational information associated with the indicated new data.
414                                         if (rec.getChangeRecordNewData().getOperationalInfo() == null) {
415                                                 logger.warn("Inbound replication data does not have the required OperationalInfo element and is NOT spec compliant. Data will be ignored");
416                                         } else {
417                                                 if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID() == null) {
418                                                         throw new Exception("Inbound replication data is missiong node id! Change will not be applied");
419                                                 }
420                                                 if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equalsIgnoreCase(getNode())) {
421                                                         logger.warn("Inbound replication data is modifying locally owned data. This is not allowed, except for custody transfer");
422                                                 }
423                                                 if (rec.getChangeRecordNewData().getBindingTemplate() != null) {
424                                                         //fetch the binding template if it exists already
425                                                         //if it exists, 
426                                                         //      confirm the owning node, it shouldn't be the local node id, if it is, throw
427                                                         //      the owning node should be the same as it was before
428 
429                                                         BusinessService model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewData().getBindingTemplate().getServiceKey());
430                                                         if (model == null) {
431                                                                 logger.error("Replication error, attempting to insert a binding where the service doesn't exist yet");
432                                                         } else {
433                                                                 validateNodeIdMatches(rec.getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());
434 
435                                                                 org.apache.juddi.model.BindingTemplate bt = em.find(org.apache.juddi.model.BindingTemplate.class, rec.getChangeRecordNewData().getBindingTemplate().getBindingKey());
436                                                                 if (bt != null) {
437                                                                         //ValidateNodeIdMatches(node, bt.getNodeId());
438                                                                         em.remove(bt);
439                                                                 }
440                                                                 bt = new BindingTemplate();
441                                                                 MappingApiToModel.mapBindingTemplate(rec.getChangeRecordNewData().getBindingTemplate(), bt, model);
442                                                                 MappingApiToModel.mapOperationalInfo(bt, rec.getChangeRecordNewData().getOperationalInfo());
443                                                                 // MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());
444                                                                 em.persist(bt);
445                                                         }
446 
447                                                 } else if (rec.getChangeRecordNewData().getBusinessEntity() != null) {
448 
449                                                         BusinessEntity model = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessEntity().getBusinessKey());
450                                                         if (model != null) {
451                                                                 //if the owner of the new data is me, and the update didn't originate from me
452                                                                 if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
453                                                                         && !model.getNodeId().equals(getNode())) {
454                                                                         if (model.getIsTransferInProgress()) {
455                                                                                 //allow the transfer
456                                                                                 MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);
457                                                                                 MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
458                                                                                 MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());
459                                                                                 model.setIsTransferInProgress(false);
460                                                                                 em.merge(model);
461                                                                         } else {
462                                                                                 //block it, unexpected transfer
463                                                                                 throw new Exception("Unexpected entity transfer to to node " + getNode() + " from " + rec.getChangeID().getNodeID());
464                                                                         }
465 
466                                                                 } else if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
467                                                                         && model.getNodeId().equals(getNode())) {
468                                                                         //if destination is here and it's staying here, then this is strange also
469                                                                         //someone else updated one of my records
470                                                                         throw new Exception("unexpected modification of records that this server owns, " + model.getEntityKey());
471                                                                 } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
472                                                                         && model.getNodeId().equals(getNode())) {
473                                                                         //this is also strange, destination is elsewhere however it's owned by me.
474                                                                         throw new Exception("unexpected transfer from this node to elsewhere, possible that the key in question exists at two places prior to replication sync, " + model.getEntityKey());
475 
476                                                                 } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
477                                                                         && !model.getNodeId().equals(getNode())) {
478                                                                         //changes on a remote node, for an existing item
479                                                                         MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);
480                                                                         MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());
481                                                                         em.merge(model);
482 
483                                                                 }
484 
485                                                         } else {
486                                                                 model = new BusinessEntity();
487                                                                 MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);
488                                                                 MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());
489                                                                 em.persist(model);
490                                                         }
491                                                 }
492                                                 if (rec.getChangeRecordNewData().getBusinessService() != null) {
493                                                         BusinessEntity find = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessService().getBusinessKey());
494                                                         if (find == null) {
495                                                                 logger.error("Replication error, attempting to insert a service where the business doesn't exist yet");
496                                                         } else {
497 
498                                                                 org.apache.juddi.model.BusinessService model = null;
499                                                                 model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewData().getBusinessService().getServiceKey());
500                                                                 if (model != null) {
501                                                                         validateNodeIdMatches(rec.getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());
502                                                                         em.remove(model);
503                                                                 }
504 
505                                                                 model = new org.apache.juddi.model.BusinessService();
506                                                                 MappingApiToModel.mapBusinessService(rec.getChangeRecordNewData().getBusinessService(), model, find);
507                                                                 MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
508                                                                 MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());
509 
510                                                                 em.persist(model);
511                                                         }
512 
513                                                 } else if (rec.getChangeRecordNewData().getTModel() != null) {
514 
515                                                         Tmodel model = em.find(org.apache.juddi.model.Tmodel.class, rec.getChangeRecordNewData().getTModel().getTModelKey());
516                                                         if (model != null) {
517                                                                 //in the case of a transfer
518                                                                 //if the new entity is being transfer to ME, accept and i didn't previously own it, but only if the local record is flagged as transferable
519                                                                 //meaning, only accept if i'm expecting a transfer
520                                                                 if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
521                                                                         && !model.getNodeId().equals(getNode())) {
522                                                                         if (model.getIsTransferInProgress()) {
523                                                                                 //allow the transfer
524                                                                                 em.remove(model);
525                                                                                 model = new Tmodel();
526                                                                                 MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);
527                                                                                 MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
528                                                                                 model.setIsTransferInProgress(false);
529                                                                                 em.persist(model);
530                                                                         } else {
531                                                                                 //block it, unexpected transfer
532                                                                                 throw new Exception("Unexpected entity transfer to this node from " + rec.getChangeID().getNodeID());
533                                                                         }
534 
535                                                                 } else if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
536                                                                         && model.getNodeId().equals(getNode())) {
537                                                                         //if destination is here and it's staying here, then this is strange also
538                                                                         //someone else updated one of my records
539                                                                         throw new Exception("unexpected modification of records that this server owns, " + model.getEntityKey());
540                                                                 } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
541                                                                         && model.getNodeId().equals(getNode())) {
542                                                                         //this is also strange, destination is elsewhere however it's owned by me.
543                                                                         throw new Exception("unexpected transfer from this node to elsewhere, possible that the key in question exists at two places prior to replication sync, " + model.getEntityKey());
544 
545                                                                 } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
546                                                                         && !model.getNodeId().equals(getNode())) {
547                                                                         //changes on a remote node, for an existing item
548                                                                         em.remove(model);
549                                                                         model = new Tmodel();
550                                                                         MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);
551 
552                                                                         MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
553 
554                                                                         em.persist(model);
555 
556                                                                 }
557                                                         } else {
558                                                                 model = new Tmodel();
559                                                                 MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);
560 
561                                                                 MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
562 
563                                                                 em.persist(model);
564                                                         }
565                                                 }
566 
567                                         }
568 
569                                 }
570 //</editor-fold>
571 
572                                 // changeRecordNull no action needed
573                                 // changeRecordHide tmodel only
574                                 //<editor-fold defaultstate="collapsed" desc="hide tmodel">
575                                 if (rec.getChangeRecordHide() != null) {
576                                         /*
577                                          A changeRecordHide element corresponds to the behavior of hiding a tModel described in the delete_tModel in the Publish API section of this Specification.  A tModel listed in a changeRecordHide should be marked as hidden, so that it is not returned in response to a find_tModel API call.
578                                         
579                                          The changeRecordHide MUST contain a modified timestamp to allow multi-node registries to calculate consistent modifiedIncludingChildren timestamps as described in Section 3.8 operationalInfo Structure.
580                                          */
581                                         String key = rec.getChangeRecordHide().getTModelKey();
582                                         org.apache.juddi.model.Tmodel existing = em.find(org.apache.juddi.model.Tmodel.class, key);
583                                         if (existing == null) {
584                                                 logger.error("Unexpected delete/hide tmodel message received for non existing key " + key);
585                                         } else {
586                                                 //no one else can delete/hide my tmodel
587                                                 validateNodeIdMisMatches(existing, getNode());
588                                                 existing.setDeleted(true);
589                                                 existing.setModified(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime());
590                                                 existing.setModifiedIncludingChildren(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime());
591                                                 em.persist(existing);
592                                         }
593                                 }
594 //</editor-fold>
595 
596                                 //<editor-fold defaultstate="collapsed" desc="changeRecordPublisherAssertion">
597                                 if (rec.getChangeRecordPublisherAssertion() != null) {
598 
599                                         logger.info("Repl CR Publisher Assertion");
600                                         //TODO are publisher assertions owned by a given node?
601                                         PublisherAssertionId paid = new PublisherAssertionId(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getFromKey(), rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getToKey());
602                                         org.apache.juddi.model.PublisherAssertion model = em.find(org.apache.juddi.model.PublisherAssertion.class, paid);
603                                         if (model != null) {
604                                                 logger.info("Repl CR Publisher Assertion - Existing");
605 
606                                                 if (rec.getChangeRecordPublisherAssertion().isFromBusinessCheck()) {
607                                                         model.setFromCheck("true");
608                                                 } else {
609                                                         model.setFromCheck("false");
610                                                 }
611 
612                                                 if (rec.getChangeRecordPublisherAssertion().isToBusinessCheck()) {
613                                                         model.setToCheck("true");
614                                                 } else {
615                                                         model.setToCheck("false");
616                                                 }
617 
618                                                 model.setKeyName(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getKeyedReference().getKeyName());
619                                                 model.setKeyValue(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getKeyedReference().getKeyValue());
620                                                 model.setTmodelKey(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getKeyedReference().getTModelKey());
621                                                 model.setModified(rec.getChangeRecordPublisherAssertion().getModified().toGregorianCalendar().getTime());
622                                                 //model.setSignatures(MappingApiToModel.mapApiSignaturesToModelSignatures(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getSignature()));
623                                                 if ("false".equalsIgnoreCase(model.getFromCheck())
624                                                         && "false".equalsIgnoreCase(model.getToCheck())) {
625                                                         logger.warn("!!!New publisher assertion is both false and false, strange. no need to save it then!");
626                                                         em.remove(model);
627                                                 }
628                                                 em.merge(model);
629                                         } else {
630                                                 logger.info("Repl CR Publisher Assertion - new PA");
631 
632                                                 model = new PublisherAssertion();
633                                                 MappingApiToModel.mapPublisherAssertion(rec.getChangeRecordPublisherAssertion().getPublisherAssertion(), model);
634                                                 model.setBusinessEntityByFromKey(null);
635                                                 model.setBusinessEntityByToKey(null);
636                                                 model.setBusinessEntityByFromKey(em.find(BusinessEntity.class, rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getFromKey()));
637                                                 model.setBusinessEntityByToKey(em.find(BusinessEntity.class, rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getToKey()));
638 
639                                                 if (rec.getChangeRecordPublisherAssertion().isFromBusinessCheck()) {
640                                                         model.setFromCheck("true");
641                                                 } else {
642                                                         model.setFromCheck("false");
643                                                 }
644 
645                                                 if (rec.getChangeRecordPublisherAssertion().isToBusinessCheck()) {
646                                                         model.setToCheck("true");
647                                                 } else {
648                                                         model.setToCheck("false");
649                                                 }
650                                                 model.setModified(rec.getChangeRecordPublisherAssertion().getModified().toGregorianCalendar().getTime());
651                                                 em.persist(model);
652                                         }
653                                 }
654 //</editor-fold>
655 
656                                 if (rec.isAcknowledgementRequested()) {
657                                         ChangeRecord posack = new ChangeRecord();
658                                         posack.setChangeRecordAcknowledgement(new ChangeRecordAcknowledgement());
659                                         posack.getChangeRecordAcknowledgement().setAcknowledgedChange(rec.getChangeID());
660                                         posack.setAcknowledgementRequested(false);
661                                         ReplicationNotifier.enqueue(MappingApiToModel.mapChangeRecord(posack));
662                                 }
663                                 if (rec.getChangeRecordNewDataConditional() != null) {
664 
665                                         if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID() == null) {
666                                                 throw new Exception("Inbound replication data is missiong node id!");
667                                         }
668 
669                                         //The operationalInfo element MUST contain the operational information associated with the indicated new data.
670                                         if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo() == null) {
671                                                 logger.warn("Inbound replication data does not have the required OperationalInfo element and is NOT spec compliant. Data will be ignored");
672                                         } else {
673                                                 if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBindingTemplate() != null) {
674                                                         //fetch the binding template if it exists already
675                                                         //if it exists, 
676                                                         //      confirm the owning node, it shouldn't be the local node id, if it is, throw
677                                                         //      the owning node should be the same as it was before
678 
679                                                         BusinessService model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBindingTemplate().getServiceKey());
680                                                         if (model == null) {
681                                                                 logger.error("Replication error, attempting to insert a binding where the service doesn't exist yet");
682                                                         } else {
683 
684                                                                 org.apache.juddi.model.BindingTemplate bt = em.find(org.apache.juddi.model.BindingTemplate.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBindingTemplate().getBindingKey());
685                                                                 if (bt != null) {
686                                                                         validateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), bt.getNodeId());
687 
688                                                                         em.remove(bt);
689                                                                 }
690                                                                 bt = new BindingTemplate();
691                                                                 MappingApiToModel.mapBindingTemplate(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBindingTemplate(), bt, model);
692                                                                 MappingApiToModel.mapOperationalInfo(bt, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
693                                                                 // MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
694                                                                 em.persist(bt);
695                                                         }
696 
697                                                 } else if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessEntity() != null) {
698 
699                                                         BusinessEntity model = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessEntity().getBusinessKey());
700                                                         if (model != null) {
701                                                                 validateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());
702                                                                 //TODO revisit access control rules
703                                                                 em.remove(model);
704                                                         }
705                                                         model = new BusinessEntity();
706                                                         MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessEntity(), model);
707                                                         // MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
708 
709                                                         MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
710                                                         logger.warn("Name size on save is " + model.getBusinessNames().size());
711                                                         em.persist(model);
712 
713                                                 }
714                                                 if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService() != null) {
715                                                         BusinessEntity find = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService().getBusinessKey());
716                                                         if (find == null) {
717                                                                 logger.error("Replication error, attempting to insert a service where the business doesn't exist yet");
718                                                         } else {
719 
720                                                                 org.apache.juddi.model.BusinessService model = null;
721                                                                 model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService().getServiceKey());
722                                                                 if (model != null) {
723                                                                         validateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());
724                                                                         em.remove(model);
725                                                                 }
726 
727                                                                 model = new org.apache.juddi.model.BusinessService();
728                                                                 MappingApiToModel.mapBusinessService(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService(), model, find);
729                                                                 MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
730                                                                 MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
731 
732                                                                 em.persist(model);
733                                                         }
734 
735                                                 } else if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getTModel() != null) {
736 
737                                                         Tmodel model = em.find(org.apache.juddi.model.Tmodel.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getTModel().getTModelKey());
738                                                         if (model != null) {
739                                                                 validateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());
740                                                                 em.remove(model);
741                                                         }
742                                                         model = new Tmodel();
743                                                         MappingApiToModel.mapTModel(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getTModel(), model);
744 
745                                                         MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
746 
747                                                         em.persist(model);
748                                                 }
749 
750                                         }
751 
752                                 }
753                                 if (rec.getChangeRecordNull() != null) {
754                                         //No action required
755 
756                                 }
757                                 if (rec.getChangeRecordCorrection() != null) {
758                                         //TODO implement
759 
760                                 }
761                                 if (rec.getChangeRecordConditionFailed() != null) {
762                                         //TODO implement
763 
764                                 }
765                                 tx.commit();
766 
767                         } catch (Exception drfm) {
768 
769                                 logger.warn("Error applying the change record! ", drfm);
770                                 StringWriter sw = new StringWriter();
771                                 JAXB.marshal(rec, sw);
772                                 logger.warn("This is the record that failed to persist: " + sw.toString());
773                                 if (tx.isActive()) {
774                                         tx.rollback();
775                                 }
776                                 if (mapChangeRecord != null) {
777                                         //set the change record's isApplied to false
778                                         try {
779                                                 tx = em.getTransaction();
780                                                 tx.begin();
781                                                 mapChangeRecord.setIsAppliedLocally(false);
782                                                 em.merge(mapChangeRecord);
783                                                 tx.commit();
784                                         } catch (Exception e) {
785                                                 logger.error("error updating change record!!", e);
786                                                 if (tx.isActive()) {
787                                                         tx.rollback();
788                                                 }
789                                         }
790                                 } else {
791                                         logger.fatal("whoa! change record is null when saving a remote change record, this is unexpected and should be reported");
792                                 }
793                         } finally {
794                                 if (tx.isActive()) {
795                                         tx.rollback();
796                                 }
797                                 em.close();
798                         }
799                 }
800 
801                 private HighWaterMarkVectorType getLastChangeRecordFrom(String sourcenode) {
802                         HighWaterMarkVectorType ret = new HighWaterMarkVectorType();
803                         ChangeRecordIDType cid = new ChangeRecordIDType();
804                         cid.setNodeID(sourcenode);
805                         cid.setOriginatingUSN(0L);
806                         EntityManager em = PersistenceManager.getEntityManager();
807                         EntityTransaction tx = em.getTransaction();
808                         try {
809                                 tx.begin();
810                                 //Long id = 0L;
811                                 try {
812                                         cid.setOriginatingUSN((Long) em.createQuery("select MAX(e.originatingUSN) from ChangeRecord e where e.nodeID = :node")
813                                                 .setParameter("node", sourcenode)
814                                                 .getSingleResult());
815                                 } catch (Exception ex) {
816                                         logger.info("unexpected error searching for last record from " + sourcenode, ex);
817                                 }
818 
819                                 tx.rollback();
820 
821                         } catch (Exception drfm) {
822                                 logger.warn("error caught fetching newest record from node " + sourcenode, drfm);
823                         } finally {
824                                 if (tx.isActive()) {
825                                         tx.rollback();
826                                 }
827                                 em.close();
828                         }
829                         logger.info("Highest known record for " + sourcenode + " is " + cid.getOriginatingUSN());
830                         ret.getHighWaterMark().add(cid);
831 
832                         return ret;
833                 }
834 
835                 private void enqueueAllReceivingNodes() {
836                         if (queue == null) {
837                                 queue = new ConcurrentLinkedQueue<NotifyChangeRecordsAvailable>();
838                         }
839                         //get the replication config
840                         //get everyone we are expecting to receive data from, then enqueue them for pulling
841                         ReplicationConfiguration repcfg = ReplicationNotifier.FetchEdges();
842                         if (repcfg == null) {
843                                 return;
844                         }
845                         Set<String> allnodes = new HashSet<String>();
846                         for (int i = 0; i < repcfg.getOperator().size(); i++) {
847                                 allnodes.add(repcfg.getOperator().get(i).getOperatorNodeID());
848                         }
849                         Set<String> receivers = new HashSet<String>();
850                         if (repcfg.getCommunicationGraph() == null
851                                 || repcfg.getCommunicationGraph().getEdge().isEmpty()) {
852                                 //no edges or graph defined, default to the operator list
853                                 for (org.uddi.repl_v3.Operator o : repcfg.getOperator()) {
854                                         //no need to tell myself about a change at myself
855                                         if (!o.getOperatorNodeID().equalsIgnoreCase(getNode())) {
856                                                 receivers.add(o.getOperatorNodeID());
857                                         }
858                                 }
859                         } else {
860                                 //repcfg.getCommunicationGraph()
861                                 Iterator<Edge> iterator = repcfg.getCommunicationGraph().getEdge().iterator();
862                                 while (iterator.hasNext()) {
863                                         Edge next = iterator.next();
864 
865                                         if (next.getMessageReceiver().equalsIgnoreCase(getNode())) {
866                                                 receivers.add(next.getMessageSender());
867                                         }
868 
869                                 }
870 
871                         }
872                         for (String s : receivers) {
873                                 //this is a list of nodes that this node is expecting updates from
874                                 //here are we ticking the notification engine to ping the remove service for updates
875                                 for (String nodeping : allnodes) {
876                                         queue.add(new NotifyChangeRecordsAvailable(s, getLastChangeRecordFrom(nodeping)));
877                                         //for each node we are expecting data from, go fetch it, along the way, we'll request all data for all nodes
878                                         //that we know about
879                                 }
880 
881                         }
882                 }
883 
884         }
885 
886         /**
887          * used to check for alterations on *this node's data from another node,
888          * which isn't allowed
889          *
890          * @param ue
891          * @param node
892          * @throws Exception
893          */
894         private static void validateNodeIdMisMatches(UddiEntity ue, String node) throws Exception {
895                 if (ue == null) {
896                         return;//object doesn't exist
897                 }
898                 if (ue.getNodeId().equals(node)) {
899                         throw new Exception("Alert! attempt to alter locally owned entity " + ue.getEntityKey() + " owned by " + ue.getAuthorizedName() + "@" + ue.getNodeId());
900                 }
901         }
902 
903         /**
904          * use to validate that changed data maintained ownership, except for
905          * business entities and tmodels since they allow transfer
906          *
907          * @param newNodeId
908          * @param currentOwningNode
909          * @throws Exception
910          */
911         private  void validateNodeIdMatches(String newNodeId, String currentOwningNode) throws Exception {
912                 if (newNodeId == null || currentOwningNode == null) {
913                         throw new Exception("either the local node ID is null or the inbound replication data's node id is null");
914                 }
915                 //only time this is allowed is custody transfer
916                 if (!newNodeId.equals(currentOwningNode)) {
917                         logger.info("AUDIT, custody transfer from node, " + currentOwningNode + " to " + newNodeId + " current node is " + getNode());
918                         //throw new Exception("node id mismatch!");
919                 }
920 
921                 //if i already have a record and "own it" and the remote node has a record with the same key, reject the update
922                 //1.5.8 
923                 /**
924                  * Each node has custody of a portion of the aggregate data
925                  * managed by the registry of which it is a part. Each datum is
926                  * by definition in the custody of exactly one such node. A
927                  * datum in this context can be a businessEntity, a
928                  * businessService, a bindingTemplate, a tModel, or a
929                  * publisherAssertion. Changes to a datum in the registry MUST
930                  * originate at the node which is the custodian of the datum.
931                  * The registry defines the policy for data custody and, if
932                  * allowed, the custodian node for a given datum can be changed;
933                  * such custody transfer processes are discussed in Section 5.4
934                  * Custody and Ownership Transfer API.
935                  */
936                 //so someone else attempted to update one of my records, reject it
937                 if (newNodeId.equals(getNode())) {
938                         //throw new Exception("node id mismatch! this node already has a record for key " + newDataOperationalInfo.getEntityKey() + " and I'm the authority for it.");
939                 }
940         }
941 
942         private synchronized UDDIReplicationPortType getReplicationClient(String node) {
943                 if (cache.containsKey(node)) {
944                         return cache.get(node);
945                 }
946                 UDDIService svc = new UDDIService();
947                 UDDIReplicationPortType replicationClient = svc.getUDDIReplicationPort();
948                 TransportSecurityHelper.applyTransportSecurity((BindingProvider) replicationClient);
949 
950                 EntityManager em = PersistenceManager.getEntityManager();
951                 EntityTransaction tx = em.getTransaction();
952                 try {
953                         tx.begin();
954                         StringBuilder sql = new StringBuilder();
955                         sql.append("select c from ReplicationConfiguration c order by c.serialNumber desc");
956                         //sql.toString();
957                         Query qry = em.createQuery(sql.toString());
958                         qry.setMaxResults(1);
959 
960                         org.apache.juddi.model.ReplicationConfiguration resultList = (org.apache.juddi.model.ReplicationConfiguration) qry.getSingleResult();
961                         for (Operator o : resultList.getOperator()) {
962                                 if (o.getOperatorNodeID().equalsIgnoreCase(node)) {
963                                         ((BindingProvider) replicationClient).getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, o.getSoapReplicationURL());
964                                         cache.put(node, replicationClient);
965                                         return replicationClient;
966                                 }
967                         }
968                         tx.rollback();
969 
970                 } catch (Exception ex) {
971                         logger.fatal("Node not found!" + node, ex);
972                 } finally {
973                         if (tx.isActive()) {
974                                 tx.rollback();
975                         }
976                         em.close();
977                 }
978                 //em.close();
979                 return null;
980 
981         }
982         private Map<String, UDDIReplicationPortType> cache = new HashMap<String, UDDIReplicationPortType>();
983 
984         /**
985          * @since 3.3
986          * @param body
987          * @return
988          * @throws DispositionReportFaultMessage
989          */
990         public String doPing(DoPing body) throws DispositionReportFaultMessage {
991                 long startTime = System.currentTimeMillis();
992                 long procTime = System.currentTimeMillis() - startTime;
993                 serviceCounter.update(ReplicationQuery.DO_PING, QueryStatus.SUCCESS, procTime);
994 
995                 return getNode();
996 
997         }
998 
999         @SOAPBinding(parameterStyle = SOAPBinding.ParameterStyle.BARE)
1000         @WebResult(name = "changeRecords", targetNamespace = "urn:uddi-org:repl_v3", partName = "body")
1001         // @WebMethod(operationName = "get_changeRecords", action = "get_changeRecords")
1002         @Override
1003         public org.uddi.repl_v3.ChangeRecords getChangeRecords(
1004                 @WebParam(partName = "body", name = "get_changeRecords", targetNamespace = "urn:uddi-org:repl_v3") org.uddi.repl_v3.GetChangeRecords body
1005         ) throws DispositionReportFaultMessage, RemoteException {
1006                 long startTime = System.currentTimeMillis();
1007                 String requestingNode = body.getRequestingNode();
1008                 HighWaterMarkVectorType changesAlreadySeen = body.getChangesAlreadySeen();
1009                 BigInteger responseLimitCount = body.getResponseLimitCount();
1010                 HighWaterMarkVectorType responseLimitVector = body.getResponseLimitVector();
1011 
1012                 new ValidateReplication(null).validateGetChangeRecords(requestingNode, changesAlreadySeen, responseLimitCount, responseLimitVector, FetchEdges(), ctx);
1013 
1014                 //TODO should we validate that "requestingNode" is in the replication config?
1015                 List<ChangeRecord> ret = new ArrayList<ChangeRecord>();
1016                 EntityManager em = PersistenceManager.getEntityManager();
1017                 EntityTransaction tx = em.getTransaction();
1018 
1019                 /**
1020                  * More specifically, the recipient determines the particular
1021                  * change records that are returned by comparing the originating
1022                  * USNs in the caller’s high water mark vector with the
1023                  * originating USNs of each of the changes the recipient has
1024                  * seen from others or generated by itself. The recipient SHOULD
1025                  * only return change records that have originating USNs that
1026                  * are greater than those listed in the changesAlreadySeen
1027                  * highWaterMarkVector and less than the limit required by
1028                  * either the responseLimitCount or the responseLimitVector.
1029                  *
1030                  *
1031                  * Part of the message is a high water mark vector that contains
1032                  * for each node of the registry the originating USN of the most
1033                  * recent change record that has been successfully processed by
1034                  * the invocating node
1035                  */
1036                 try {
1037                         int maxrecords = AppConfig.getConfiguration().getInt(Property.JUDDI_REPLICATION_GET_CHANGE_RECORDS_MAX, 100);
1038                         if (responseLimitCount != null) {
1039                                 maxrecords = responseLimitCount.intValue();
1040                         }
1041                         tx.begin();
1042                         Long firstrecord = 0L;
1043                         Long lastrecord = null;
1044                         Query createQuery = null;
1045 //SELECT t0.id, t0.change_contents, t0.entity_key, t0.appliedlocal, t0.node_id, t0.orginating_usn, t0.record_type FROM j3_chg_record t0 WHERE (t0.id > NULL AND t0.node_id = ?) ORDER BY t0.id ASC                        
1046                         if (changesAlreadySeen != null) {
1047                                 //this is basically a lower limit (i.e. the newest record that was processed by the requestor
1048                                 //therefore we want the oldest record stored locally to return to the requestor for processing
1049                                 for (int i = 0; i < changesAlreadySeen.getHighWaterMark().size(); i++) {
1050                                         firstrecord = changesAlreadySeen.getHighWaterMark().get(i).getOriginatingUSN();
1051                                         if (firstrecord == null) {
1052                                                 firstrecord = 0L;
1053                                         }
1054                                         if (changesAlreadySeen.getHighWaterMark().get(i).getNodeID().equals(getNode())) {
1055                                                 //special case, search by database id
1056                                                 createQuery = em.createQuery("select e from ChangeRecord e where "
1057                                                         + "(e.id > :inbound AND e.nodeID = :node) "
1058                                                         + "order by e.id ASC");
1059 
1060                                         } else {
1061                                                 createQuery = em.createQuery("select e from ChangeRecord e where "
1062                                                         + "e.originatingUSN > :inbound AND e.nodeID = :node "
1063                                                         + "order by e.originatingUSN ASC");
1064                                         }
1065                                         logger.info("Query db for replication changes, lower index is " + (firstrecord) + " last index " + lastrecord + " record limit " + maxrecords);
1066                                         logger.info("This node is " + getNode() + ", request is for data originated from " + changesAlreadySeen.getHighWaterMark().get(i).getNodeID() + " and it's being sent back to " + requestingNode);
1067 
1068                                         createQuery.setMaxResults(maxrecords);
1069                                         createQuery.setParameter("inbound", firstrecord);
1070                                         createQuery.setParameter("node", changesAlreadySeen.getHighWaterMark().get(i).getNodeID());
1071                                         List<org.apache.juddi.model.ChangeRecord> records = (List<org.apache.juddi.model.ChangeRecord>) createQuery.getResultList();
1072                                         logger.info(records.size() + " CR records returned from query");
1073                                         for (int x = 0; x < records.size(); x++) {
1074                                                 ChangeRecord r = MappingModelToApi.mapChangeRecord(records.get(x));
1075                                                 //if (!Excluded(changesAlreadySeen, r)) {
1076                                                 ret.add(r);
1077                                                 //}
1078 
1079                                         }
1080                                 }
1081                         } /*if (responseLimitVector != null) {
1082                          //using responseLimitVector, indicating for each node in the graph the first change originating there that he does not wish to be returned.
1083                          //upper limit basically
1084                          for (int i = 0; i < responseLimitVector.getHighWaterMark().size(); i++) {
1085                          //if (responseLimitVector.getHighWaterMark().get(i).getNodeID().equals(node)) {
1086                          lastrecord = responseLimitVector.getHighWaterMark().get(i).getOriginatingUSN();
1087                          //}
1088                          }
1089                          }*/ else {
1090                                 if (firstrecord == null) {
1091                                         firstrecord = 0L;
1092                                 }
1093                                 //assume that they just want records that originated from here?
1094                                 logger.info("Query db for replication changes, lower index is " + (firstrecord) + " last index " + lastrecord + " record limit " + maxrecords);
1095                                 logger.info("This node is " + getNode() + " requesting node " + requestingNode);
1096 
1097                                 if (lastrecord != null) {
1098                                         createQuery = em.createQuery("select e from ChangeRecord e where "
1099                                                 + "(e.id > :inbound AND e.nodeID = :node AND e.id < :lastrecord) "
1100                                                 + "order by e.id ASC");
1101                                         createQuery.setParameter("lastrecord", lastrecord);
1102                                 } else {
1103                                         createQuery = em.createQuery("select e from ChangeRecord e where "
1104                                                 + "(e.id > :inbound AND e.nodeID = :node) "
1105                                                 + "order by e.id ASC");
1106                                 }
1107                                 createQuery.setMaxResults(maxrecords);
1108                                 createQuery.setParameter("inbound", firstrecord);
1109                                 createQuery.setParameter("node", getNode());
1110 
1111                                 List<org.apache.juddi.model.ChangeRecord> records = (List<org.apache.juddi.model.ChangeRecord>) createQuery.getResultList();
1112                                 logger.info(records.size() + " CR records returned from query");
1113                                 for (int i = 0; i < records.size(); i++) {
1114                                         ChangeRecord r = MappingModelToApi.mapChangeRecord(records.get(i));
1115                                         //if (!Excluded(changesAlreadySeen, r)) {
1116                                         ret.add(r);
1117                                         //}
1118 
1119                                 }
1120                         }
1121                         tx.rollback();
1122                         long procTime = System.currentTimeMillis() - startTime;
1123                         serviceCounter.update(ReplicationQuery.GET_CHANGERECORDS,
1124                                 QueryStatus.SUCCESS, procTime);
1125 
1126                 } catch (Exception ex) {
1127                         logger.fatal("Error, this node is: " + getNode(), ex);
1128                         throw new FatalErrorException(new ErrorMessage("E_fatalError", ex.getMessage()));
1129 
1130                 } finally {
1131                         if (tx.isActive()) {
1132                                 tx.rollback();
1133                         }
1134                         em.close();
1135                 }
1136                 logger.info("Change records returned for " + requestingNode + ": " + ret.size());
1137                 //JAXB.marshal(ret, System.out);
1138                 ChangeRecords x = new ChangeRecords();
1139                 x.getChangeRecord().addAll(ret);
1140                 //JAXB.marshal(x, System.out);
1141                 return x;
1142         }
1143 
1144         /**
1145          * This UDDI API message provides a means to obtain a list of
1146          * highWaterMark element containing the highest known USN for all nodes
1147          * in the replication graph. If there is no graph, we just return the
1148          * local bits
1149          *
1150          * @return
1151          * @throws DispositionReportFaultMessage
1152          */
1153         @Override
1154         public List<ChangeRecordIDType> getHighWaterMarks()
1155                 throws DispositionReportFaultMessage {
1156                 long startTime = System.currentTimeMillis();
1157 
1158                 List<ChangeRecordIDType> ret = new ArrayList<ChangeRecordIDType>();
1159 
1160                 //fetch from database the highest known watermark
1161                 ReplicationConfiguration FetchEdges = FetchEdges();
1162 
1163                 EntityManager em = PersistenceManager.getEntityManager();
1164                 EntityTransaction tx = em.getTransaction();
1165                 HashMap<String, Long> map = new HashMap<String, Long>();
1166                 try {
1167                         tx.begin();
1168                         if (FetchEdges != null) {
1169                                 Iterator<String> it = FetchEdges.getCommunicationGraph().getNode().iterator();
1170                                 while (it.hasNext()) {
1171                                         String nextNode = it.next();
1172                                         if (!nextNode.equals(getNode())) {
1173                                                 if (!map.containsKey(nextNode)) {
1174                                                         Long id = 0L;
1175                                                         try {
1176                                                                 id = (Long) em.createQuery("select e.originatingUSN from ChangeRecord e where e.nodeID = :node order by e.originatingUSN desc").setParameter("node", nextNode).setMaxResults(1).getSingleResult();
1177                                                         } catch (Exception ex) {
1178                                                                 logger.debug(ex);
1179                                                         }
1180                                                         if (id == null) {
1181                                                                 id = 0L;
1182                                                                 //per the spec
1183                                                         }
1184                                                         map.put(nextNode, id);
1185 
1186                                                 }
1187                                         }
1188                                 }
1189                         }
1190                         //dont forget this node
1191                         Long id = (Long) em.createQuery("select (e.id) from ChangeRecord e where e.nodeID = :node  order by e.id desc")
1192                                 .setParameter("node", getNode()).setMaxResults(1).getSingleResult();
1193                         if (id == null) {
1194                                 id = 0L;
1195                         }
1196                         ChangeRecordIDType x = new ChangeRecordIDType();
1197                         x.setNodeID(getNode());
1198                         x.setOriginatingUSN(id);
1199                         ret.add(x);
1200 
1201                         tx.rollback();
1202                         long procTime = System.currentTimeMillis() - startTime;
1203                         serviceCounter.update(ReplicationQuery.GET_HIGHWATERMARKS, QueryStatus.SUCCESS, procTime);
1204 
1205                 } catch (Exception drfm) {
1206                         throw new FatalErrorException(new ErrorMessage("E_fatalError", drfm.getMessage()));
1207 
1208                 } finally {
1209                         if (tx.isActive()) {
1210                                 tx.rollback();
1211                         }
1212                         em.close();
1213                 }
1214 
1215                 Iterator<Map.Entry<String, Long>> iterator = map.entrySet().iterator();
1216                 while (iterator.hasNext()) {
1217                         Map.Entry<String, Long> next = iterator.next();
1218                         ret.add(new ChangeRecordIDType(next.getKey(), next.getValue()));
1219                 }
1220                 return ret;
1221         }
1222 
1223         /**
1224          * this means that another node has a change and we need to pick up the
1225          * change and apply it to our local database.
1226          *
1227          * @param body
1228          * @throws DispositionReportFaultMessage
1229          */
1230         @Override
1231         public void notifyChangeRecordsAvailable(NotifyChangeRecordsAvailable body)
1232                 throws DispositionReportFaultMessage {
1233                 long startTime = System.currentTimeMillis();
1234 
1235                 //some other node just told us there's new records available, call
1236                 //getChangeRecords from the remote node asynch
1237                 new ValidateReplication(null).validateNotifyChangeRecordsAvailable(body, ctx);
1238 
1239                 logger.info(body.getNotifyingNode() + " just told me that there are change records available, enqueuing...size is " + queue.size() + " this node is " + getNode());
1240                 //if (!queue.contains(body.getNotifyingNode())) {
1241                 queue.add(body);
1242                 //}
1243                 long procTime = System.currentTimeMillis() - startTime;
1244                 serviceCounter.update(ReplicationQuery.NOTIFY_CHANGERECORDSAVAILABLE,
1245                         QueryStatus.SUCCESS, procTime);
1246         }
1247         private static Queue<NotifyChangeRecordsAvailable> queue = null;
1248 
1249         /**
1250          * transfers custody of an entity from node1/user1 to node2/user2
1251          *
1252          * assume this node is node 2.
1253          *
1254          * user1 on node1 requests a transfer token. node 1 issues the token.
1255          *
1256          * user1 now has a transfer token for their stuff user now takes the
1257          * token to node 2 and calls transferEntities
1258          * <img src="http://www.uddi.org/pubs/uddi-v3.0.2-20041019_files/image086.gif">
1259          *
1260          * @param body
1261          * @throws DispositionReportFaultMessage
1262          */
1263         @Override
1264         public void transferCustody(TransferCustody body)
1265                 throws DispositionReportFaultMessage {
1266                 long startTime = System.currentTimeMillis();
1267                 EntityManager em = PersistenceManager.getEntityManager();
1268                 EntityTransaction tx = em.getTransaction();
1269                 logger.info("Inbound transfer request (via replication api, node to node");
1270                 try {
1271                         tx.begin();
1272                 //*this node is transfering data to another node
1273                         //ValidateReplication.unsupportedAPICall();
1274                         //a remote node just told me to give up control of some of my entities
1275 
1276                         //EntityTransaction tx = em.getTransaction();
1277                         //confirm i have a replication config
1278                         boolean ok = false;
1279                         ReplicationConfiguration FetchEdges = ReplicationNotifier.FetchEdges();
1280                         if (FetchEdges != null) {
1281                                 for (int i = 0; i < FetchEdges.getOperator().size(); i++) {
1282                                         //confirm that the destination node is in the replication config
1283                                         if (FetchEdges.getOperator().get(i).getOperatorNodeID().equals(body.getTransferOperationalInfo().getNodeID())) {
1284                                                 ok = true;
1285                                                 break;
1286                                         }
1287                                 }
1288                         }
1289                         if (!ok) {
1290                                 throw new TransferNotAllowedException(new ErrorMessage("E_transferNotAllowedUnknownNode"));
1291                         }
1292 
1293                         new ValidateReplication(null).validateTransfer(em, body);
1294 
1295                         TransferEntities te = new TransferEntities();
1296                         te.setKeyBag(body.getKeyBag());
1297                         te.setTransferToken(body.getTransferToken());
1298                         te.setAuthInfo(null);
1299                         //make the change
1300                         //enqueue in replication notifier
1301                         //discard the token
1302                         logger.debug("request validated, processing transfer");
1303                         List<ChangeRecord> executeTransfer = new UDDICustodyTransferImpl().executeTransfer(te, em, body.getTransferOperationalInfo().getAuthorizedName(), body.getTransferOperationalInfo().getNodeID());
1304 
1305                         for (ChangeRecord c : executeTransfer) {
1306                                 try {
1307                                         c.setChangeID(new ChangeRecordIDType());
1308                                         c.getChangeID().setNodeID(getNode());
1309                                         c.getChangeID().setOriginatingUSN(null);
1310                                         ReplicationNotifier.enqueue(MappingApiToModel.mapChangeRecord(c));
1311                                 } catch (UnsupportedEncodingException ex) {
1312                                         logger.error("", ex);
1313                                 }
1314                         }
1315                         /**
1316                          * The custodial node must verify that it has granted
1317                          * permission to transfer the entities identified and
1318                          * that this permission is still valid. This operation
1319                          * is comprised of two steps:
1320                          *
1321                          * 1. Verification that the transferToken was issued by
1322                          * it, that it has not expired, that it represents the
1323                          * authority to transfer no more and no less than those
1324                          * entities identified by the businessKey and tModelKey
1325                          * elements and that all these entities are still valid
1326                          * and not yet transferred. The transferToken is
1327                          * invalidated if any of these conditions are not met.
1328                          *
1329                          * 2. If the conditions above are met, the custodial
1330                          * node will prevent any further changes to the entities
1331                          * identified by the businessKey and tModelKey elements
1332                          * identified. The entity will remain in this state
1333                          * until the replication stream indicates it has been
1334                          * successfully processed via the replication stream.
1335                          * Upon successful verification of the custody transfer
1336                          * request by the custodial node, an empty message is
1337                          * returned by it indicating the success of the request
1338                          * and acknowledging the custody transfer. Following the
1339                          * issue of the empty message, the custodial node will
1340                          * submit into the replication stream a
1341                          * changeRecordNewData providing in the operationalInfo,
1342                          * the nodeID accepting custody of the datum and the
1343                          * authorizedName of the publisher accepting ownership.
1344                          * The acknowledgmentRequested attribute of this change
1345                          * record MUST be set to "true".
1346                          *
1347                          *
1348                          *
1349                          * Finally, the custodial node invalidates the
1350                          * transferToken in order to prevent additional calls of
1351                          * the transfer_entities API.
1352                          */
1353                         tx.commit();
1354                         long procTime = System.currentTimeMillis() - startTime;
1355                         serviceCounter.update(ReplicationQuery.TRANSFER_CUSTODY,
1356                                 QueryStatus.SUCCESS, procTime);
1357                 } catch (DispositionReportFaultMessage d) {
1358                         logger.error("Unable to process node to node custody transfer ", d);
1359                         throw d;
1360                 } finally {
1361                         if (em != null && em.isOpen()) {
1362                                 em.close();
1363                         }
1364                         if (tx.isActive()) {
1365                                 tx.rollback();
1366                         }
1367                 }
1368         }
1369 
1370 }