This project has retired. For details please refer to its
Attic page.
SubscriptionNotifier xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.juddi.subscription;
18
19 import java.io.IOException;
20 import java.util.Collection;
21 import java.util.Date;
22 import java.util.GregorianCalendar;
23 import java.util.Map;
24 import java.util.Timer;
25 import java.util.TimerTask;
26 import java.util.concurrent.ConcurrentHashMap;
27
28 import javax.persistence.EntityManager;
29 import javax.persistence.EntityTransaction;
30 import javax.persistence.Query;
31 import javax.xml.datatype.DatatypeConfigurationException;
32 import javax.xml.datatype.DatatypeFactory;
33 import javax.xml.datatype.Duration;
34
35 import org.apache.commons.configuration.ConfigurationException;
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.juddi.api.impl.ServiceCounterLifecycleResource;
39 import org.apache.juddi.api.impl.UDDIPublicationImpl;
40 import org.apache.juddi.api.impl.UDDISecurityImpl;
41 import org.apache.juddi.api.impl.UDDIServiceCounter;
42 import org.apache.juddi.api.impl.UDDISubscriptionImpl;
43 import org.apache.juddi.api_v3.AccessPointType;
44 import org.apache.juddi.config.AppConfig;
45 import org.apache.juddi.config.PersistenceManager;
46 import org.apache.juddi.config.Property;
47 import org.apache.juddi.model.Subscription;
48 import org.apache.juddi.model.UddiEntityPublisher;
49 import org.apache.juddi.subscription.notify.Notifier;
50 import org.apache.juddi.subscription.notify.NotifierFactory;
51 import org.uddi.sub_v3.CoveragePeriod;
52 import org.uddi.sub_v3.GetSubscriptionResults;
53 import org.uddi.sub_v3.SubscriptionResultsList;
54 import org.uddi.subr_v3.NotifySubscriptionListener;
55 import org.uddi.v3_service.DispositionReportFaultMessage;
56
57
58
59
60
61
62 public class SubscriptionNotifier extends TimerTask {
63
64 private static final Log log = LogFactory.getLog(SubscriptionNotifier.class);
65 private Timer timer = null;
66 private final long startBuffer = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 20000l);
67 private final long interval = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l);
68 private final long acceptableLagTime = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_ACCEPTABLE_LAGTIME, 1000l);
69 private final int maxTries = AppConfig.getConfiguration().getInt(Property.JUDDI_NOTIFICATION_MAX_TRIES, 3);
70 private final long badListResetInterval = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_LIST_RESET_INTERVAL, 1000l * 3600);
71
72
73
74 private final boolean sendToken = AppConfig.getConfiguration().getBoolean(Property.JUDDI_NOTIFICATION_SENDAUTHTOKEN, false);
75 private final UDDISubscriptionImpl subscriptionImpl = new UDDISubscriptionImpl();
76 private final Boolean alwaysNotify = false;
77 private Date desiredDate = null;
78 private int lastUpdateCounter;
79 private final UDDIServiceCounter serviceCounter = ServiceCounterLifecycleResource.getServiceCounter(UDDIPublicationImpl.class);
80 private final String[] attributes = {
81 "save_business", "save_service", "save_binding", "save_tmodel",
82 "delete_business", "delete_service", "delete_binding", "delete_tmodel",
83 "add_publisherassertions", "set_publisherassertions", "delete_publisherassertions"
84 };
85 private static Map<String, Integer> badNotifications = new ConcurrentHashMap<String, Integer>();
86 private static Date lastBadNotificationReset = new Date();
87
88
89
90
91
92
93 public SubscriptionNotifier() throws ConfigurationException {
94 super();
95 timer = new Timer(true);
96 timer.scheduleAtFixedRate(this, startBuffer, interval);
97 }
98
99 @Override
100 public boolean cancel() {
101 timer.cancel();
102 return super.cancel();
103 }
104
105
106
107
108
109
110
111
112
113
114 protected boolean registryMayContainUpdates() {
115 boolean isUpdated = false;
116 int updateCounter = 0;
117
118
119
120 if (desiredDate != null && new Date().getTime() > desiredDate.getTime()) {
121 return true;
122 }
123 try {
124 for (String attribute : attributes) {
125 String counter = serviceCounter.getAttribute(attribute + " successful queries").toString();
126 updateCounter += Integer.valueOf(counter);
127 }
128
129
130 if (updateCounter != lastUpdateCounter) {
131 lastUpdateCounter = updateCounter;
132 isUpdated = true;
133 }
134 } catch (Exception e) {
135 log.error(e.getMessage(), e);
136 }
137 return isUpdated;
138 }
139
140 @Override
141 public synchronized void run() {
142 if (badListResetInterval > 0 && new Date().getTime() > lastBadNotificationReset.getTime() + badListResetInterval) {
143 badNotifications = new ConcurrentHashMap<String, Integer>();
144 lastBadNotificationReset = new Date();
145 log.debug("badNotificationList was reset");
146 }
147 if ((firedOnTime(scheduledExecutionTime()) || alwaysNotify) && registryMayContainUpdates()) {
148 long startTime = System.currentTimeMillis();
149 desiredDate = null;
150 log.info("Start Notification background task; checking if subscription notifications need to be send out..");
151
152 Collection<Subscription> subscriptions = getAllAsyncSubscriptions();
153 for (Subscription subscription : subscriptions) {
154
155 if (subscription.getExpiresAfter() == null || subscription.getExpiresAfter().getTime() > startTime
156 || !isTemporarilyDisabled(subscription.getSubscriptionKey())) {
157 try {
158
159
160 Date notificationDate = new Date(scheduledExecutionTime());
161 GetSubscriptionResults getSubscriptionResults
162 = buildGetSubscriptionResults(subscription, notificationDate);
163 if (getSubscriptionResults != null) {
164 getSubscriptionResults.setSubscriptionKey(subscription.getSubscriptionKey());
165 UddiEntityPublisher publisher = new UddiEntityPublisher();
166 publisher.setAuthorizedName(subscription.getAuthorizedName());
167 SubscriptionResultsList resultList = subscriptionImpl.getSubscriptionResults(getSubscriptionResults, publisher);
168 String token = resultList.getChunkToken();
169 if (resultListContainsChanges(resultList)) {
170 log.info("We have a change and need to notify " + subscription.getSubscriptionKey());
171 resultList.setChunkToken(null);
172
173 notify(getSubscriptionResults, resultList, notificationDate);
174 } else {
175 log.info("No changes where recorded, no need to notify.");
176 }
177 while (!token.equalsIgnoreCase("0")) {
178 resultList = subscriptionImpl.getSubscriptionResults(getSubscriptionResults, publisher);
179 if (resultListContainsChanges(resultList)) {
180 log.info("We have a change and need to notify " + subscription.getSubscriptionKey());
181 resultList.setChunkToken(null);
182
183 notify(getSubscriptionResults, resultList, notificationDate);
184 } else {
185 log.info("No changes where recorded, no need to notify.");
186 }
187 }
188
189 }
190 } catch (Exception e) {
191 log.error("Could not obtain subscriptionResult for subscriptionKey "
192 + subscription.getSubscriptionKey() + ". " + e.getMessage(), e);
193 }
194 } else {
195
196 log.info("Subcription with key " + subscription.getSubscriptionKey()
197 + " expired " + subscription.getExpiresAfter());
198 deleteSubscription(subscription);
199 }
200 }
201 long endTime = System.currentTimeMillis();
202
203 if ((endTime - startTime) > interval) {
204 log.info("Notification background task duration exceeds the JUDDI_NOTIFICATION_INTERVAL"
205 + " of " + interval + ". Notification background task took "
206 + (endTime - startTime) + " milliseconds.");
207 } else {
208 log.info("Notification background task took " + (endTime - startTime) + " milliseconds.");
209 }
210 } else {
211 log.debug("Skipping current notification cycle because lagtime is too great.");
212 }
213 }
214
215
216
217
218
219
220
221
222
223
224
225
226 private boolean firedOnTime(long scheduleExecutionTime) {
227 long lagTime = System.currentTimeMillis() - scheduleExecutionTime;
228 if (lagTime <= acceptableLagTime || acceptableLagTime < 0) {
229 return true;
230 } else {
231 log.debug("NotificationTimer is lagging " + lagTime + " milli seconds behind. A lag time "
232 + "which exceeds an acceptable lagtime of " + acceptableLagTime + "ms indicates "
233 + "that the registry server is under load or was in sleep mode. We are therefore skipping this notification "
234 + "cycle.");
235 return false;
236 }
237 }
238
239 protected GetSubscriptionResults buildGetSubscriptionResults(Subscription subscription, Date endPoint)
240 throws DispositionReportFaultMessage, DatatypeConfigurationException {
241
242 GetSubscriptionResults getSubscriptionResults = null;
243 Duration duration = TypeConvertor.convertStringToDuration(subscription.getNotificationInterval());
244 Date startPoint = subscription.getLastNotified();
245 Date nextDesiredNotificationDate = null;
246 if (startPoint == null) {
247 startPoint = subscription.getCreateDate();
248 }
249 nextDesiredNotificationDate = new Date(startPoint.getTime());
250 duration.addTo(nextDesiredNotificationDate);
251
252
253
254 if (subscription.getLastNotified() == null || nextDesiredNotificationDate.after(startPoint) && nextDesiredNotificationDate.before(endPoint)) {
255 getSubscriptionResults = new GetSubscriptionResults();
256 CoveragePeriod period = new CoveragePeriod();
257 GregorianCalendar calendar = new GregorianCalendar();
258 calendar.setTimeInMillis(startPoint.getTime());
259 period.setStartPoint(DatatypeFactory.newInstance().newXMLGregorianCalendar(calendar));
260 calendar.setTimeInMillis(endPoint.getTime());
261 period.setEndPoint(DatatypeFactory.newInstance().newXMLGregorianCalendar(calendar));
262 if (log.isDebugEnabled()) {
263 log.debug("Period " + period.getStartPoint() + " " + period.getEndPoint());
264 }
265 getSubscriptionResults.setCoveragePeriod(period);
266 } else {
267 log.info("Client does not yet want a notification. The next desidered notification Date " + nextDesiredNotificationDate + ". The current interval ["
268 + startPoint + " , " + endPoint + "] therefore skipping this notification cycle.");
269 if (desiredDate == null || nextDesiredNotificationDate.getTime() < desiredDate.getTime()) {
270 desiredDate = nextDesiredNotificationDate;
271 }
272 }
273 return getSubscriptionResults;
274
275 }
276
277 protected boolean resultListContainsChanges(SubscriptionResultsList resultList) {
278 if (resultList == null) {
279 return false;
280 }
281 if (resultList.getBindingDetail() != null || resultList.getBusinessDetail() != null
282 || resultList.getBusinessList() != null || resultList.getServiceDetail() != null
283 || resultList.getServiceList() != null || resultList.getTModelDetail() != null
284 || resultList.getTModelList() != null || resultList.getRelatedBusinessesList() != null
285 || resultList.getAssertionStatusReport() !=null) {
286 return true;
287 }
288
289 if (resultList.getKeyBag() != null && resultList.getKeyBag().size() > 0) {
290 return true;
291 }
292
293 return false;
294 }
295
296
297
298
299
300
301 @SuppressWarnings("unchecked")
302 protected Collection<Subscription> getAllAsyncSubscriptions() {
303 Collection<Subscription> subscriptions = null;
304 EntityManager em = PersistenceManager.getEntityManager();
305 EntityTransaction tx = em.getTransaction();
306 try {
307 tx.begin();
308 Query query = em.createQuery("SELECT s FROM Subscription s WHERE s.bindingKey IS NOT NULL");
309 subscriptions = (Collection<Subscription>) query.getResultList();
310 tx.commit();
311 } finally {
312 if (tx.isActive()) {
313 tx.rollback();
314 }
315 em.close();
316 }
317 return subscriptions;
318 }
319
320
321
322
323
324
325 protected void deleteSubscription(Subscription subscription) {
326 EntityManager em = PersistenceManager.getEntityManager();
327 EntityTransaction tx = em.getTransaction();
328 try {
329 tx.begin();
330 em.remove(subscription);
331 tx.commit();
332 } finally {
333 if (tx.isActive()) {
334 tx.rollback();
335 }
336 em.close();
337 }
338 }
339
340
341
342
343
344
345
346
347 protected void notify(GetSubscriptionResults getSubscriptionResults, SubscriptionResultsList resultList, Date notificationDate) {
348 EntityManager em = PersistenceManager.getEntityManager();
349 EntityTransaction tx = em.getTransaction();
350 try {
351 String subscriptionKey = resultList.getSubscription().getSubscriptionKey();
352 org.apache.juddi.model.Subscription modelSubscription
353 = em.find(org.apache.juddi.model.Subscription.class, subscriptionKey);
354 Date lastNotifiedDate = modelSubscription.getLastNotified();
355
356 tx.begin();
357 modelSubscription.setLastNotified(notificationDate);
358 em.persist(modelSubscription);
359 tx.commit();
360
361 org.apache.juddi.model.BindingTemplate bindingTemplate = em.find(org.apache.juddi.model.BindingTemplate.class, modelSubscription.getBindingKey());
362 NotifySubscriptionListener body = new NotifySubscriptionListener();
363
364
365
366
367 body.setSubscriptionResultsList(resultList);
368
369
370
371 if (sendToken) {
372 String authorizedName = modelSubscription.getAuthorizedName();
373 UDDISecurityImpl security = new UDDISecurityImpl();
374
375 if (authorizedName != null) {
376 try {
377
378 org.uddi.api_v3.AuthToken token = security.getAuthToken(authorizedName);
379 body.setAuthInfo(token.getAuthInfo());
380 } catch (DispositionReportFaultMessage e) {
381 body.setAuthInfo("Failed to generate token, please contact UDDI admin");
382 log.error(e.getMessage(), e);
383 }
384 }
385 }
386
387 if (bindingTemplate != null) {
388 if (AccessPointType.END_POINT.toString().equalsIgnoreCase(bindingTemplate.getAccessPointType())
389 || AccessPointType.WSDL_DEPLOYMENT.toString().equalsIgnoreCase(bindingTemplate.getAccessPointType())) {
390 try {
391 Notifier notifier = new NotifierFactory().getNotifier(bindingTemplate);
392 if (notifier != null) {
393 log.info("Sending out notification to " + bindingTemplate.getAccessPointUrl());
394 notifier.notifySubscriptionListener(body);
395
396 String chunkToken = body.getSubscriptionResultsList().getChunkToken();
397 while (chunkToken != null) {
398 UddiEntityPublisher publisher = new UddiEntityPublisher();
399 publisher.setAuthorizedName(modelSubscription.getAuthorizedName());
400 log.debug("Sending out next chunk: " + chunkToken + " to " + bindingTemplate.getAccessPointUrl());
401 getSubscriptionResults.setChunkToken(chunkToken);
402 resultList = subscriptionImpl.getSubscriptionResults(getSubscriptionResults, publisher);
403 body.setSubscriptionResultsList(resultList);
404 if (resultListContainsChanges(resultList))
405 {
406 notifier.notifySubscriptionListener(body);
407 }
408 chunkToken = body.getSubscriptionResultsList().getChunkToken();
409 }
410
411 if (badNotifications.containsKey(resultList.getSubscription().getSubscriptionKey())) {
412 badNotifications.remove(resultList.getSubscription().getSubscriptionKey());
413 }
414 }
415 } catch (Exception e) {
416 if (e.getCause() instanceof IOException) {
417 addBadNotificationToList(subscriptionKey, bindingTemplate.getAccessPointUrl());
418
419 modelSubscription.setLastNotified(lastNotifiedDate);
420 tx.begin();
421 em.persist(modelSubscription);
422 tx.commit();
423
424
425 }
426 log.error("Unexpected notification exception:" + e.getClass().getCanonicalName() + " " + e.getMessage() + " " + e.getCause());
427 log.debug("Unexpected notification exception:" + e.getClass().getCanonicalName() + " " + e.getMessage() + " " + e.getCause(), e);
428 }
429 } else {
430 log.info("Binding " + bindingTemplate.getEntityKey() + " has an unsupported binding type of "
431 + bindingTemplate.getAccessPointType() + ". Only "
432 + AccessPointType.END_POINT.toString() + " and "
433 + AccessPointType.WSDL_DEPLOYMENT.toString() + " are supported.");
434 addBadNotificationToList(subscriptionKey, bindingTemplate.getAccessPointType() + " not supported");
435 }
436 } else {
437 log.info("There is no valid binding template defined for this subscription: " + modelSubscription.getBindingKey());
438 addBadNotificationToList(subscriptionKey, modelSubscription.getBindingKey() + " not found");
439 }
440
441 } finally {
442 if (tx.isActive()) {
443 tx.rollback();
444 }
445 em.close();
446 }
447 }
448
449 protected UDDISubscriptionImpl getSubscriptionImpl() {
450 return subscriptionImpl;
451 }
452
453 private boolean isTemporarilyDisabled(String subscriptionKey) {
454 if (maxTries > 0 && badNotifications.containsKey(subscriptionKey) && badNotifications.get(subscriptionKey) > maxTries) {
455 log.debug("Subscription " + subscriptionKey + " is temperarily disabled. The notification endpoint"
456 + " could not be reached more then " + maxTries + " times");
457 return true;
458 }
459 return false;
460 }
461
462 private int addBadNotificationToList(String subscriptionKey, String endPoint) {
463 Integer numberOfBadNotifications = 0;
464 if (badNotifications.containsKey(subscriptionKey)) {
465 numberOfBadNotifications = badNotifications.get(subscriptionKey);
466 }
467 badNotifications.put(subscriptionKey, ++numberOfBadNotifications);
468 log.debug("bad notification number " + numberOfBadNotifications + " for subscription "
469 + subscriptionKey + " " + endPoint);
470 return numberOfBadNotifications;
471 }
472
473 }