View Javadoc
1   /*
2    * Copyright 2001-2008 The Apache Software Foundation.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  package org.apache.juddi.replication;
18  
19  import java.io.UnsupportedEncodingException;
20  import java.util.ArrayList;
21  import java.util.HashSet;
22  import java.util.Iterator;
23  import java.util.List;
24  import java.util.Queue;
25  import java.util.Set;
26  import java.util.Timer;
27  import java.util.TimerTask;
28  import java.util.concurrent.ConcurrentLinkedQueue;
29  import java.util.logging.Level;
30  import java.util.logging.Logger;
31  import javax.persistence.EntityManager;
32  import javax.persistence.EntityTransaction;
33  import javax.persistence.Query;
34  import javax.xml.bind.JAXB;
35  import javax.xml.ws.BindingProvider;
36  import org.apache.commons.configuration.ConfigurationException;
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.juddi.api_v3.Node;
40  import org.apache.juddi.config.AppConfig;
41  import org.apache.juddi.config.PersistenceManager;
42  import org.apache.juddi.config.Property;
43  import org.apache.juddi.mapping.MappingApiToModel;
44  import org.apache.juddi.mapping.MappingModelToApi;
45  import org.apache.juddi.model.ChangeRecord;
46  import org.apache.juddi.model.ReplicationConfiguration;
47  import org.apache.juddi.v3.client.UDDIService;
48  import org.apache.juddi.v3.client.cryptor.TransportSecurityHelper;
49  import org.uddi.repl_v3.ChangeRecordIDType;
50  import org.uddi.repl_v3.CommunicationGraph.Edge;
51  import org.uddi.repl_v3.HighWaterMarkVectorType;
52  import org.uddi.repl_v3.NotifyChangeRecordsAvailable;
53  import org.uddi.repl_v3.Operator;
54  import org.uddi.v3_service.UDDIReplicationPortType;
55  
56  /**
57   * Handles when local records have been changed, change journal storage and
58   * notifications to all remote replication nodes that something has been
59   * altered.
60   *
61   * @author <a href="mailto:alexoree@apache.org">Alex O'Ree</a>
62   *
63   */
64  public class ReplicationNotifier extends TimerTask {
65  
66          private static Log log = LogFactory.getLog(ReplicationNotifier.class);
67          private Timer timer = null;
68          private long startBuffer = 5000;//AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 20000l); // 20s startup delay default 
69          private long interval = 5000;//AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default
70          private static String node = null;
71          private static UDDIService uddiService = new UDDIService();
72  
73          /**
74           * default constructor
75           *
76           * @throws ConfigurationException
77           */
78          public ReplicationNotifier() throws ConfigurationException {
79                 super();
80                 init();
81          }
82          private synchronized void init() throws ConfigurationException {
83                  timer = new Timer(true);
84                  startBuffer=AppConfig.getConfiguration().getLong(Property.JUDDI_REPLICATION_START_BUFFER, 5000l);
85                  interval = AppConfig.getConfiguration().getLong(Property.JUDDI_REPLICATION_INTERVAL, 5000l);
86                  timer.scheduleAtFixedRate(this, startBuffer, interval);
87                  if (queue == null) {
88                          queue = new ConcurrentLinkedQueue();
89                  }
90                  node = AppConfig.getConfiguration().getString(Property.JUDDI_NODE_ID, "UNDEFINED_NODE_NAME");
91          }
92  
93          @Override
94          public boolean cancel() {
95                  timer.cancel();
96                  //TODO notify other nodes that i'm going down
97                  return super.cancel();
98          }
99  
100         //ReplicationNotifier.Enqueue(this);
101         public synchronized static void enqueue(org.apache.juddi.model.ChangeRecord change) {
102                 if (queue == null) {
103                         queue = new ConcurrentLinkedQueue<org.apache.juddi.model.ChangeRecord>();
104                 }
105                 queue.add(change);
106         }
107 
108         public synchronized static void EnqueueRetransmit(org.uddi.repl_v3.ChangeRecord change) {
109                 if (queue2 == null) {
110                         queue2 = new ConcurrentLinkedQueue<org.uddi.repl_v3.ChangeRecord>();
111                 }
112                 queue2.add(change);
113         }
114         static Queue<org.apache.juddi.model.ChangeRecord> queue;
115         static Queue<org.uddi.repl_v3.ChangeRecord> queue2;
116 
117         /**
118          * Note: this is for locally originated changes only, see null null null         {@link org.apache.juddi.api.impl.UDDIReplicationImpl.PullTimerTask#PersistChangeRecord PersistChangeRecord
119          * } for how remote changes are processed
120          *
121          * @param j must be one of the UDDI save APIs
122          *
123          */
124         protected void ProcessChangeRecord(org.apache.juddi.model.ChangeRecord j) {
125                 //store and convert the changes to database model
126 
127                 //TODO need a switch to send the notification without persisting the record
128                 //this is to support multihop notifications
129                 EntityManager em = PersistenceManager.getEntityManager();
130                 EntityTransaction tx = null;
131                 try {
132                         tx = em.getTransaction();
133                         tx.begin();
134                         j.setIsAppliedLocally(true);
135                         em.persist(j);
136                         j.setOriginatingUSN(j.getId());
137                         em.merge(j);
138                         log.info("CR saved locally, it was from " + j.getNodeID()
139                                 + " USN:" + j.getOriginatingUSN()
140                                 + " Type:" + j.getRecordType().name()
141                                 + " Key:" + j.getEntityKey()
142                                 + " Local id:" + j.getId());
143                         tx.commit();
144                 } catch (Exception ex) {
145                         log.fatal("unable to store local change record locally!!", ex);
146                         if (tx != null && tx.isActive()) {
147                                 tx.rollback();
148                         }
149                         JAXB.marshal(MappingModelToApi.mapChangeRecord(j), System.out);
150                 } finally {
151                         em.close();
152                 }
153 
154                 log.debug("ChangeRecord: " + j.getId() + "," + j.getEntityKey() + "," + j.getNodeID() + "," + j.getOriginatingUSN() + "," + j.getRecordType().toString());
155                 SendNotifications(j.getId(), j.getNodeID(), false);
156 
157         }
158 
159         private void SendNotifications(Long id, String origin_node, boolean isRetrans) {
160 
161                 org.uddi.repl_v3.ReplicationConfiguration repcfg = FetchEdges();
162 
163                 if (repcfg == null) {
164                         log.debug("No replication configuration is defined!");
165                         return;
166 
167                 }
168                 if (id == null || origin_node == null) {
169                         log.fatal("Either the id is null or the origin_node is null. I can't send out this alert!!");
170                         //throw new Exception(node);
171                         return;
172                 }
173 
174                 Set<Object> destinationUrls = new HashSet<Object>();
175 
176                 /**
177                  * In the absence of a communicationGraph element from the
178                  * Replication Configuration Structure (although it's mandatory
179                  * in the xsd), all nodes listed in the node element MAY send
180                  * any and all messages to any other node of the registry.
181                  */
182                 if (repcfg.getCommunicationGraph() == null
183                         || repcfg.getCommunicationGraph().getEdge().isEmpty() && !isRetrans) {
184                         //no edges or graph defined, default to the operator list
185                         //retransmission only applies to non-directed-edge replication, thus the extra check
186                         for (Operator o : repcfg.getOperator()) {
187                                 //no need to tell myself about a change at myself or the origin
188                                 if (!o.getOperatorNodeID().equalsIgnoreCase(node) && !o.getOperatorNodeID().equalsIgnoreCase(origin_node)) {
189                                         destinationUrls.add(o.getSoapReplicationURL());
190                                 }
191                         }
192                 } else {
193                         //this is for directed graph replication
194                         //find all nodes that i need to notify
195                         Iterator<Edge> iterator = repcfg.getCommunicationGraph().getEdge().iterator();
196                         while (iterator.hasNext()) {
197                                 Edge next = iterator.next();
198 
199                                 if (next.getMessageSender().equalsIgnoreCase(node)) {
200 
201                                         //this is my server and i need to transmit the notification to
202                                         String messageReceiver = next.getMessageReceiver();
203                                         PrimaryAlternate container = new PrimaryAlternate();
204                                         //pointless to send a notification to myself or the origin
205                                         if (!messageReceiver.equalsIgnoreCase(node) && !messageReceiver.equalsIgnoreCase(origin_node)) {
206                                                 //look up the endpoint urls
207                                                 for (int x = 0; x < repcfg.getOperator().size(); x++) {
208                                                         if (repcfg.getOperator().get(x).getOperatorNodeID().equalsIgnoreCase(messageReceiver)) {
209                                                                 container.primaryUrl = repcfg.getOperator().get(x).getSoapReplicationURL();
210                                                         }
211                                                 }
212                                                 for (int y = 0; y < next.getMessageReceiverAlternate().size(); y++) {
213                                                         for (int x = 0; x < repcfg.getOperator().size(); x++) {
214                                                                 if (repcfg.getOperator().get(x).getOperatorNodeID().equalsIgnoreCase(next.getMessageReceiverAlternate().get(y))) {
215                                                                         container.alternateUrls.add(repcfg.getOperator().get(x).getSoapReplicationURL());
216                                                                 }
217                                                         }
218                                                 }
219                                         }
220                                         if (container.primaryUrl != null) {
221                                                 destinationUrls.add(container);
222                                         } else {
223                                                 log.warn("Unable to find primary url for directed edge graph replication from this node " + node + " to "
224                                                         + "destination node " + next.getMessageReceiver() + " it will be ignored!");
225                                         }
226 
227                                 }
228 
229                         }
230 
231                 }
232 
233                 if (destinationUrls.isEmpty()) {
234                         log.debug("Something is bizarre with the replication config. I should have had at least one node to notify, but I have none!");
235                         return;
236                 }
237                 UDDIReplicationPortType x = uddiService.getUDDIReplicationPort();
238                 TransportSecurityHelper.applyTransportSecurity((BindingProvider) x);
239 
240                 for (Object s : destinationUrls) {
241 
242                         NotifyChangeRecordsAvailable req = new NotifyChangeRecordsAvailable();
243 
244                         req.setNotifyingNode(node);
245                         HighWaterMarkVectorType highWaterMarkVectorType = new HighWaterMarkVectorType();
246 
247                         highWaterMarkVectorType.getHighWaterMark().add(new ChangeRecordIDType(origin_node, id));
248                         req.setChangesAvailable(highWaterMarkVectorType);
249 
250                         if (s instanceof String) {
251                                 SendNotification(x, (String) s, req);
252                         } else if (s instanceof PrimaryAlternate) {
253                                 //more complex directed graph stuff
254                                 PrimaryAlternate pa = (PrimaryAlternate) s;
255                                 if (!SendNotification(x, pa.primaryUrl, req)) {
256                                         for (String url : pa.alternateUrls) {
257                                                 if (SendNotification(x, url, req)) {
258                                                         break;
259                                                 }
260                                                 //no need to continue to additional alternates
261                                         }
262                                 } else {
263                                         //primary url succeeded, no further action required
264                                 }
265 
266                         }
267 
268                         //TODO the spec talks about control messages, should we even support it? seems pointless
269                 }
270 
271         }
272 
273         /**
274          * return true if successful
275          *
276          * @param x
277          * @param s
278          * @param req
279          * @return
280          */
281         private boolean SendNotification(UDDIReplicationPortType x, String s, NotifyChangeRecordsAvailable req) {
282                 ((BindingProvider) x).getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, s);
283                 try {
284                         x.notifyChangeRecordsAvailable(req);
285                         log.info("Successfully sent change record available message to " + s + " this node: " + node);
286                         return true;
287                 } catch (Exception ex) {
288                         log.warn("Unable to send change notification to " + s + " this node: " + node + " reason: " + ex.getMessage());
289                         log.debug("Unable to send change notification to " + s, ex);
290                 }
291                 return false;
292         }
293 
294         private static class PrimaryAlternate {
295 
296                 String primaryUrl = null;
297                 List<String> alternateUrls = new ArrayList<String>();
298         }
299 
300         public synchronized void run() {
301                 log.debug("Replication thread triggered");
302                 if (queue == null) {
303                         queue = new ConcurrentLinkedQueue();
304                 }
305                 if (queue2 == null) {
306                         queue2 = new ConcurrentLinkedQueue();
307                 }
308                 //TODO revisie this
309                 if (!queue.isEmpty()) {
310                         log.info("Replication, Notifying nodes of new change records. " + queue.size() + " queued");
311                 }
312 
313                 //TODO check for replication config changes
314                 while (!queue.isEmpty()) {
315                         //for each change at this node
316 
317                         ChangeRecord j = queue.poll();
318                         ProcessChangeRecord(j);
319 
320                 }
321 
322                 while (!queue2.isEmpty()) {
323                         //for each change at this node
324 
325                         org.uddi.repl_v3.ChangeRecord j = queue2.poll();
326 
327                         ChangeRecord model = new ChangeRecord();
328                         try {
329                                 model = MappingApiToModel.mapChangeRecord(j);
330                         } catch (UnsupportedEncodingException ex) {
331                                 Logger.getLogger(ReplicationNotifier.class.getName()).log(Level.SEVERE, null, ex);
332                         }
333                         log.info("retransmitting CR notificationm entity owner: " + j.getChangeID().getNodeID() + " CR: " + j.getChangeID().getOriginatingUSN() + " key:" + model.getEntityKey() + " " + model.getRecordType().name() + " accepted locally:" + model.getIsAppliedLocally());
334                         SendNotifications(j.getChangeID().getOriginatingUSN(), j.getChangeID().getNodeID(), true);
335 
336                 }
337         }
338 
339         /**
340          * returns the latest version of the replication config or null if there
341          * is no config
342          *
343          * @return
344          */
345         public static org.uddi.repl_v3.ReplicationConfiguration FetchEdges() {
346 
347                 EntityManager em = PersistenceManager.getEntityManager();
348                 EntityTransaction tx = null;
349                 org.uddi.repl_v3.ReplicationConfiguration item = new org.uddi.repl_v3.ReplicationConfiguration();
350                 try {
351                         tx = em.getTransaction();
352                         tx.begin();
353                         Query q = em.createQuery("SELECT item FROM ReplicationConfiguration item order by item.serialNumber DESC");
354                         q.setMaxResults(1);
355                         ReplicationConfiguration results = (ReplicationConfiguration) q.getSingleResult();
356                         //   ReplicationConfiguration find = em.find(ReplicationConfiguration.class, null);
357                         if (results != null) {
358                                 MappingModelToApi.mapReplicationConfiguration(results, item);
359                         }
360 
361                         tx.commit();
362                         return item;
363                 } catch (Exception ex) {
364                         //log.error("error", ex);
365                         //no config available
366                         if (tx != null && tx.isActive()) {
367                                 tx.rollback();
368                         }
369                 } finally {
370                         em.close();
371                 }
372                 return null;
373         }
374 
375         @Deprecated
376         private Node getNode(String messageSender) {
377                 EntityManager em = PersistenceManager.getEntityManager();
378                 EntityTransaction tx = null;
379                 try {
380                         tx = em.getTransaction();
381                         tx.begin();
382                         Node api = new Node();
383                         org.apache.juddi.model.Node find = em.find(org.apache.juddi.model.Node.class, messageSender);
384                         if (find != null) {
385                                 MappingModelToApi.mapNode(find, api);
386                         }
387                         tx.commit();
388                         return api;
389                 } catch (Exception ex) {
390                         log.error("error", ex);
391                         if (tx != null && tx.isActive()) {
392                                 tx.rollback();
393                         }
394                 } finally {
395                         em.close();
396                 }
397                 return null;
398         }
399 }