This project has retired. For details please refer to its Attic page.
SubscriptionNotifier xref
View Javadoc
1   /*
2    * Copyright 2001-2008 The Apache Software Foundation.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  package org.apache.juddi.subscription;
18  
19  import java.io.IOException;
20  import java.util.Collection;
21  import java.util.Date;
22  import java.util.GregorianCalendar;
23  import java.util.Map;
24  import java.util.Timer;
25  import java.util.TimerTask;
26  import java.util.concurrent.ConcurrentHashMap;
27  
28  import javax.persistence.EntityManager;
29  import javax.persistence.EntityTransaction;
30  import javax.persistence.Query;
31  import javax.xml.datatype.DatatypeConfigurationException;
32  import javax.xml.datatype.DatatypeFactory;
33  import javax.xml.datatype.Duration;
34  
35  import org.apache.commons.configuration.ConfigurationException;
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.juddi.api.impl.ServiceCounterLifecycleResource;
39  import org.apache.juddi.api.impl.UDDIPublicationImpl;
40  import org.apache.juddi.api.impl.UDDISecurityImpl;
41  import org.apache.juddi.api.impl.UDDIServiceCounter;
42  import org.apache.juddi.api.impl.UDDISubscriptionImpl;
43  import org.apache.juddi.api_v3.AccessPointType;
44  import org.apache.juddi.config.AppConfig;
45  import org.apache.juddi.config.PersistenceManager;
46  import org.apache.juddi.config.Property;
47  import org.apache.juddi.model.Subscription;
48  import org.apache.juddi.model.UddiEntityPublisher;
49  import org.apache.juddi.subscription.notify.Notifier;
50  import org.apache.juddi.subscription.notify.NotifierFactory;
51  import org.uddi.sub_v3.CoveragePeriod;
52  import org.uddi.sub_v3.GetSubscriptionResults;
53  import org.uddi.sub_v3.SubscriptionResultsList;
54  import org.uddi.subr_v3.NotifySubscriptionListener;
55  import org.uddi.v3_service.DispositionReportFaultMessage;
56  
57  /**
58   *
59   * @author <a href="mailto:kstam@apache.org">Kurt T Stam</a>
60   *
61   */
62  public class SubscriptionNotifier extends TimerTask {
63  
64          private static final Log log = LogFactory.getLog(SubscriptionNotifier.class);
65          private Timer timer = null;
66          private final long startBuffer = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 20000l); // 20s startup delay default 
67          private final long interval = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default
68          private final long acceptableLagTime = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_ACCEPTABLE_LAGTIME, 1000l); //1000 milliseconds
69          private final int maxTries = AppConfig.getConfiguration().getInt(Property.JUDDI_NOTIFICATION_MAX_TRIES, 3);
70          private final long badListResetInterval = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_LIST_RESET_INTERVAL, 1000l * 3600); //one hour
71          /**
72           * @since 3.2
73           */
74          private final boolean sendToken = AppConfig.getConfiguration().getBoolean(Property.JUDDI_NOTIFICATION_SENDAUTHTOKEN, false);
75          private final UDDISubscriptionImpl subscriptionImpl = new UDDISubscriptionImpl();
76          private final Boolean alwaysNotify = false;
77          private Date desiredDate = null;
78          private int lastUpdateCounter;
79          private final UDDIServiceCounter serviceCounter = ServiceCounterLifecycleResource.getServiceCounter(UDDIPublicationImpl.class);
80          private final String[] attributes = {
81                  "save_business", "save_service", "save_binding", "save_tmodel",
82                  "delete_business", "delete_service", "delete_binding", "delete_tmodel",
83                  "add_publisherassertions", "set_publisherassertions", "delete_publisherassertions"
84          };
85          private static Map<String, Integer> badNotifications = new ConcurrentHashMap<String, Integer>();
86          private static Date lastBadNotificationReset = new Date();
87  
88          /**
89           * default constructor
90           *
91           * @throws ConfigurationException
92           */
93          public SubscriptionNotifier() throws ConfigurationException {
94                  super();
95                  timer = new Timer(true);
96                  timer.scheduleAtFixedRate(this, startBuffer, interval);
97          }
98  
99          @Override
100         public boolean cancel() {
101                 timer.cancel();
102                 return super.cancel();
103         }
104 
105         /**
106          * If the CRUD methods on the publication API where not called, this
107          * registry node does not contain changes. If the registry database is
108          * shared with other registry nodes and one of those registries pushed
109          * in a change, then that registry node will take care of sending out
110          * notifications.
111          *
112          * @return true/false
113          */
114         protected boolean registryMayContainUpdates() {
115                 boolean isUpdated = false;
116                 int updateCounter = 0;
117 		//if the desiredDate is set it means that we've declined sending out a notification before
118                 //because the a client did not want a notification yet. However if this desired
119                 //notification time has come we should try sending out the notification now.
120                 if (desiredDate != null && new Date().getTime() > desiredDate.getTime()) {
121                         return true;
122                 }
123                 try {
124                         for (String attribute : attributes) {
125                                 String counter = serviceCounter.getAttribute(attribute + " successful queries").toString();
126                                 updateCounter += Integer.valueOf(counter);
127                         }
128 			// if the counts are not the same something has changed, 
129                         // this accounts for the case where the counters where reset.
130                         if (updateCounter != lastUpdateCounter) {
131                                 lastUpdateCounter = updateCounter;
132                                 isUpdated = true;
133                         }
134                 } catch (Exception e) {
135                         log.error(e.getMessage(), e);
136                 }
137                 return isUpdated;
138         }
139 
140         @Override
141         public synchronized void run() {
142                 if (badListResetInterval > 0 && new Date().getTime() > lastBadNotificationReset.getTime() + badListResetInterval) {
143                         badNotifications = new ConcurrentHashMap<String, Integer>();
144                         lastBadNotificationReset = new Date();
145                         log.debug("badNotificationList was reset");
146                 }
147                 if ((firedOnTime(scheduledExecutionTime()) || alwaysNotify) && registryMayContainUpdates()) {
148                         long startTime = System.currentTimeMillis();
149                         desiredDate = null;
150                         log.info("Start Notification background task; checking if subscription notifications need to be send out..");
151 
152                         Collection<Subscription> subscriptions = getAllAsyncSubscriptions();
153                         for (Subscription subscription : subscriptions) {
154 
155                                 if (subscription.getExpiresAfter() == null || subscription.getExpiresAfter().getTime() > startTime
156                                         || !isTemporarilyDisabled(subscription.getSubscriptionKey())) {
157                                         try {
158 						//build a query with a coverage period from the lastNotified time to 
159                                                 //now (the scheduled Execution time)
160                                                 Date notificationDate = new Date(scheduledExecutionTime());
161                                                 GetSubscriptionResults getSubscriptionResults
162                                                         = buildGetSubscriptionResults(subscription, notificationDate);
163                                                 if (getSubscriptionResults != null) {
164                                                         getSubscriptionResults.setSubscriptionKey(subscription.getSubscriptionKey());
165                                                         UddiEntityPublisher publisher = new UddiEntityPublisher();
166                                                         publisher.setAuthorizedName(subscription.getAuthorizedName());
167                                                         SubscriptionResultsList resultList = subscriptionImpl.getSubscriptionResults(getSubscriptionResults, publisher);
168                                                         String token = resultList.getChunkToken();
169                                                         if (resultListContainsChanges(resultList)) {
170                                                                 log.info("We have a change and need to notify " + subscription.getSubscriptionKey());
171                                                                 resultList.setChunkToken(null);
172                                                                 //Note that the chunkToken is not returned with this structure for this API.  
173                                                                 notify(getSubscriptionResults, resultList, notificationDate);
174                                                         } else {
175                                                                 log.info("No changes where recorded, no need to notify.");
176                                                         }
177                                                         while (!token.equalsIgnoreCase("0")) {
178                                                                 resultList = subscriptionImpl.getSubscriptionResults(getSubscriptionResults, publisher);
179                                                                 if (resultListContainsChanges(resultList)) {
180                                                                         log.info("We have a change and need to notify " + subscription.getSubscriptionKey());
181                                                                         resultList.setChunkToken(null);
182                                                                         //Note that the chunkToken is not returned with this structure for this API.  
183                                                                         notify(getSubscriptionResults, resultList, notificationDate);
184                                                                 } else {
185                                                                         log.info("No changes where recorded, no need to notify.");
186                                                                 }
187                                                         }
188 
189                                                 }
190                                         } catch (Exception e) {
191                                                 log.error("Could not obtain subscriptionResult for subscriptionKey "
192                                                         + subscription.getSubscriptionKey() + ". " + e.getMessage(), e);
193                                         }
194                                 } else {
195                                         // the subscription expired, we should delete it
196                                         log.info("Subcription with key " + subscription.getSubscriptionKey()
197                                                 + " expired " + subscription.getExpiresAfter());
198                                         deleteSubscription(subscription);
199                                 }
200                         }
201                         long endTime = System.currentTimeMillis();
202 
203                         if ((endTime - startTime) > interval) {
204                                 log.info("Notification background task duration exceeds the JUDDI_NOTIFICATION_INTERVAL"
205                                         + " of " + interval + ". Notification background task took "
206                                         + (endTime - startTime) + " milliseconds.");
207                         } else {
208                                 log.info("Notification background task took " + (endTime - startTime) + " milliseconds.");
209                         }
210                 } else {
211                         log.debug("Skipping current notification cycle because lagtime is too great.");
212                 }
213         }
214 
215         /**
216          * Checks to see that the event are fired on time. If they are late this
217          * may indicate that the server is under load. The acceptableLagTime is
218          * configurable using the "juddi.notification.acceptable.lagtime"
219          * property and is defaulted to 500ms. A negative value means that you
220          * do not care about the lag time and you simply always want to go do
221          * the notification work.
222          *
223          * @param scheduleExecutionTime
224          * @return true if the server is within the acceptable latency lag.
225          */
226         private boolean firedOnTime(long scheduleExecutionTime) {
227                 long lagTime = System.currentTimeMillis() - scheduleExecutionTime;
228                 if (lagTime <= acceptableLagTime || acceptableLagTime < 0) {
229                         return true;
230                 } else {
231                         log.debug("NotificationTimer is lagging " + lagTime + " milli seconds behind. A lag time "
232                                 + "which exceeds an acceptable lagtime of " + acceptableLagTime + "ms indicates "
233                                 + "that the registry server is under load or was in sleep mode. We are therefore skipping this notification "
234                                 + "cycle.");
235                         return false;
236                 }
237         }
238 
239         protected GetSubscriptionResults buildGetSubscriptionResults(Subscription subscription, Date endPoint)
240                 throws DispositionReportFaultMessage, DatatypeConfigurationException {
241 
242                 GetSubscriptionResults getSubscriptionResults = null;
243                 Duration duration = TypeConvertor.convertStringToDuration(subscription.getNotificationInterval());
244                 Date startPoint = subscription.getLastNotified();
245                 Date nextDesiredNotificationDate = null;
246                 if (startPoint == null) {
247                         startPoint = subscription.getCreateDate();
248                 }
249                 nextDesiredNotificationDate = new Date(startPoint.getTime());
250                 duration.addTo(nextDesiredNotificationDate);
251 		//nextDesiredNotificationDate = lastTime + the Interval Duration, which should be:
252                 //AFTER the lastNotified time and BEFORE the endTime (current time). If it is
253                 //after the endTime, then the user does not want a notification yet, so we accumulate.
254                 if (subscription.getLastNotified() == null || nextDesiredNotificationDate.after(startPoint) && nextDesiredNotificationDate.before(endPoint)) {
255                         getSubscriptionResults = new GetSubscriptionResults();
256                         CoveragePeriod period = new CoveragePeriod();
257                         GregorianCalendar calendar = new GregorianCalendar();
258                         calendar.setTimeInMillis(startPoint.getTime());
259                         period.setStartPoint(DatatypeFactory.newInstance().newXMLGregorianCalendar(calendar));
260                         calendar.setTimeInMillis(endPoint.getTime());
261                         period.setEndPoint(DatatypeFactory.newInstance().newXMLGregorianCalendar(calendar));
262                         if (log.isDebugEnabled()) {
263                                 log.debug("Period " + period.getStartPoint() + " " + period.getEndPoint());
264                         }
265                         getSubscriptionResults.setCoveragePeriod(period);
266                 } else {
267                         log.info("Client does not yet want a notification. The next desidered notification Date " + nextDesiredNotificationDate + ". The current interval ["
268                                 + startPoint + " , " + endPoint + "] therefore skipping this notification cycle.");
269                         if (desiredDate == null || nextDesiredNotificationDate.getTime() < desiredDate.getTime()) {
270                                 desiredDate = nextDesiredNotificationDate;
271                         }
272                 }
273                 return getSubscriptionResults;
274 
275         }
276 
277         protected boolean resultListContainsChanges(SubscriptionResultsList resultList) {
278                 if (resultList == null) {
279                         return false;
280                 }
281                 if (resultList.getBindingDetail() != null || resultList.getBusinessDetail() != null
282                         || resultList.getBusinessList() != null || resultList.getServiceDetail() != null
283                         || resultList.getServiceList() != null || resultList.getTModelDetail() != null
284                         || resultList.getTModelList() != null || resultList.getRelatedBusinessesList() != null
285                         || resultList.getAssertionStatusReport() !=null) {
286                         return true;
287                 }
288                 //When the response is 'brief', or when there are deleted only keyBags are used.
289                 if (resultList.getKeyBag() != null && resultList.getKeyBag().size() > 0) {
290                         return true;
291                 }
292                 //there are no changes to what was subscribed to
293                 return false;
294         }
295 
296         /**
297          * Obtains all subscriptions in the system.
298          *
299          * @return Collection of All Subscriptions in the system.
300          */
301         @SuppressWarnings("unchecked")
302         protected Collection<Subscription> getAllAsyncSubscriptions() {
303                 Collection<Subscription> subscriptions = null;
304                 EntityManager em = PersistenceManager.getEntityManager();
305                 EntityTransaction tx = em.getTransaction();
306                 try {
307                         tx.begin();
308                         Query query = em.createQuery("SELECT s FROM Subscription s WHERE s.bindingKey IS NOT NULL");
309                         subscriptions = (Collection<Subscription>) query.getResultList();
310                         tx.commit();
311                 } finally {
312                         if (tx.isActive()) {
313                                 tx.rollback();
314                         }
315                         em.close();
316                 }
317                 return subscriptions;
318         }
319 
320         /**
321          * Deletes the subscription. i.e. when it is expired.
322          *
323          * @param subscription
324          */
325         protected void deleteSubscription(Subscription subscription) {
326                 EntityManager em = PersistenceManager.getEntityManager();
327                 EntityTransaction tx = em.getTransaction();
328                 try {
329                         tx.begin();
330                         em.remove(subscription);
331                         tx.commit();
332                 } finally {
333                         if (tx.isActive()) {
334                                 tx.rollback();
335                         }
336                         em.close();
337                 }
338         }
339 
340         /**
341          * Sends out the notifications.
342          *
343          * @param getSubscriptionResults
344          * @param resultList
345          * @param notificationDate
346          */
347         protected void notify(GetSubscriptionResults getSubscriptionResults, SubscriptionResultsList resultList, Date notificationDate) {
348                 EntityManager em = PersistenceManager.getEntityManager();
349                 EntityTransaction tx = em.getTransaction();
350                 try {
351                         String subscriptionKey = resultList.getSubscription().getSubscriptionKey();
352                         org.apache.juddi.model.Subscription modelSubscription
353                                 = em.find(org.apache.juddi.model.Subscription.class, subscriptionKey);
354                         Date lastNotifiedDate = modelSubscription.getLastNotified();
355                         //now log to the db that we are sending the notification.
356                         tx.begin();
357                         modelSubscription.setLastNotified(notificationDate);
358                         em.persist(modelSubscription);
359                         tx.commit();
360 
361                         org.apache.juddi.model.BindingTemplate bindingTemplate = em.find(org.apache.juddi.model.BindingTemplate.class, modelSubscription.getBindingKey());
362                         NotifySubscriptionListener body = new NotifySubscriptionListener();
363 //			if (resultList.getServiceList()!=null && resultList.getServiceList().getServiceInfos()!=null &&
364 //					resultList.getServiceList().getServiceInfos().getServiceInfo().size() == 0) {
365 //				resultList.getServiceList().setServiceInfos(null);
366 //			}
367                         body.setSubscriptionResultsList(resultList);
368 
369                          //TODO if the endpoint requires an auth token, look up the security endpoint of the remote registry
370                         //via ClientSubscriptionInfo
371                         if (sendToken) {
372                                 String authorizedName = modelSubscription.getAuthorizedName();
373                                 UDDISecurityImpl security = new UDDISecurityImpl();
374 
375                                 if (authorizedName != null) { // add a security token if needed
376                                         try {
377                                                 //obtain a token for this publisher
378                                                 org.uddi.api_v3.AuthToken token = security.getAuthToken(authorizedName);
379                                                 body.setAuthInfo(token.getAuthInfo());
380                                         } catch (DispositionReportFaultMessage e) {
381                                                 body.setAuthInfo("Failed to generate token, please contact UDDI admin");
382                                                 log.error(e.getMessage(), e);
383                                         }
384                                 }
385                         }
386 
387                         if (bindingTemplate != null) {
388                                 if (AccessPointType.END_POINT.toString().equalsIgnoreCase(bindingTemplate.getAccessPointType())
389                                         || AccessPointType.WSDL_DEPLOYMENT.toString().equalsIgnoreCase(bindingTemplate.getAccessPointType())) {
390                                         try {
391                                                 Notifier notifier = new NotifierFactory().getNotifier(bindingTemplate);
392                                                 if (notifier != null) {
393                                                         log.info("Sending out notification to " + bindingTemplate.getAccessPointUrl());
394                                                         notifier.notifySubscriptionListener(body);
395                                                         //there maybe more chunks we have to send
396                                                         String chunkToken = body.getSubscriptionResultsList().getChunkToken();
397                                                         while (chunkToken != null) {
398                                                                 UddiEntityPublisher publisher = new UddiEntityPublisher();
399                                                                 publisher.setAuthorizedName(modelSubscription.getAuthorizedName());
400                                                                 log.debug("Sending out next chunk: " + chunkToken + " to " + bindingTemplate.getAccessPointUrl());
401                                                                 getSubscriptionResults.setChunkToken(chunkToken);
402                                                                 resultList = subscriptionImpl.getSubscriptionResults(getSubscriptionResults, publisher);
403                                                                 body.setSubscriptionResultsList(resultList);
404                                                                 if (resultListContainsChanges(resultList)) //if (!IsEmpty(resultList))
405                                                                 {
406                                                                         notifier.notifySubscriptionListener(body);
407                                                                 }
408                                                                 chunkToken = body.getSubscriptionResultsList().getChunkToken();
409                                                         }
410                                                         //successful notification so remove from the badNotificationList
411                                                         if (badNotifications.containsKey(resultList.getSubscription().getSubscriptionKey())) {
412                                                                 badNotifications.remove(resultList.getSubscription().getSubscriptionKey());
413                                                         }
414                                                 }
415                                         } catch (Exception e) {
416                                                 if (e.getCause() instanceof IOException) {
417                                                         addBadNotificationToList(subscriptionKey, bindingTemplate.getAccessPointUrl());
418                                                         //we could not notify so compensate the transaction above
419                                                         modelSubscription.setLastNotified(lastNotifiedDate);
420                                                         tx.begin();
421                                                         em.persist(modelSubscription);
422                                                         tx.commit();
423 						//} else {
424                                                         //log.warn("Unexpected WebServiceException " + e.getMessage() + e.getCause());
425                                                 }
426                                                 log.error("Unexpected notification exception:" + e.getClass().getCanonicalName() + " " + e.getMessage() + " " + e.getCause());
427                                                 log.debug("Unexpected notification exception:" + e.getClass().getCanonicalName() + " " + e.getMessage() + " " + e.getCause(), e);
428                                         }
429                                 } else {
430                                         log.info("Binding " + bindingTemplate.getEntityKey() + " has an unsupported binding type of "
431                                                 + bindingTemplate.getAccessPointType() + ". Only "
432                                                 + AccessPointType.END_POINT.toString() + " and "
433                                                 + AccessPointType.WSDL_DEPLOYMENT.toString() + " are supported.");
434                                         addBadNotificationToList(subscriptionKey, bindingTemplate.getAccessPointType() + " not supported");
435                                 }
436                         } else {
437                                 log.info("There is no valid binding template defined for this subscription: " + modelSubscription.getBindingKey());
438                                 addBadNotificationToList(subscriptionKey, modelSubscription.getBindingKey() + " not found");
439                         }
440 
441                 } finally {
442                         if (tx.isActive()) {
443                                 tx.rollback();
444                         }
445                         em.close();
446                 }
447         }
448 
449         protected UDDISubscriptionImpl getSubscriptionImpl() {
450                 return subscriptionImpl;
451         }
452 
453         private boolean isTemporarilyDisabled(String subscriptionKey) {
454                 if (maxTries > 0 && badNotifications.containsKey(subscriptionKey) && badNotifications.get(subscriptionKey) > maxTries) {
455                         log.debug("Subscription " + subscriptionKey + " is temperarily disabled. The notification endpoint"
456                                 + " could not be reached more then " + maxTries + " times");
457                         return true;
458                 }
459                 return false;
460         }
461 
462         private int addBadNotificationToList(String subscriptionKey, String endPoint) {
463                 Integer numberOfBadNotifications = 0;
464                 if (badNotifications.containsKey(subscriptionKey)) {
465                         numberOfBadNotifications = badNotifications.get(subscriptionKey);
466                 }
467                 badNotifications.put(subscriptionKey, ++numberOfBadNotifications);
468                 log.debug("bad notification number " + numberOfBadNotifications + " for subscription "
469                         + subscriptionKey + " " + endPoint);
470                 return numberOfBadNotifications;
471         }
472 
473 }