kjburns1980 Ответов: 0

Мне нужна помощь в решении проблемы параллелизма с использованием синхронизированных блоков


Я написал систему издатель-подписчик для многопоточного проекта, над которым работаю. Насколько я понимаю, при использовании SynchronizedList структурные операции должны быть потокобезопасными, но итерация требует, чтобы список был завернут в synchronized блок.

По этим причинам я не могу понять, почему я получаю здесь исключение comod. Похоже, я принял все необходимые меры предосторожности, чтобы предотвратить это: все итерации находятся в синхронизированных блоках, а сам список является закрытым, поэтому к нему нельзя получить доступ извне класса.

Соответствующий код приведен ниже:
/**
 * Intermediary which receives messages and distributes them to subscribers
 * that have expressed interest in those messages.
 * @author Kevin J. Burns
 *
 */
public class Publisher {
	private List<subscription> subscriptions = 
			Collections.synchronizedList(new ArrayList<>());
	private static final Publisher instance = new Publisher();
	
	private Publisher() {
		// Does nothing special except prevent instantiation.
	}
	
	/**
	 * Registers a subscription to a subset of messages.
	 * @param sub An object that can receive a message when sent
	 * @param type RTTI info for the message type of interest.
	 * @param filter A filter to use to determine whether a future message
	 * matches this subscription 
	 * @return a reference to the subscription, in case subscribers wish to
	 * keep track of their subscriptions
	 */
	public Subscription subscribe(Subscriber sub, 
			Class type, Predicate<message> filter) {
		/*
		 * Implementation note:
		 * While it would be good to make the generic clause more specific
		 * (thereby reducing type-checking code in filters), I'm not sure how 
		 * to instantiate generics from RTTI info.
		 */
		Subscription ret = new Subscription();
		ret.subscriber = sub;
		ret.messageType = type;
		ret.filter = filter;
		
		System.out.println("Subscribing " + sub + " to " + type.toString());
		synchronized (this.subscriptions) {
			this.subscriptions.add(ret);
		}
		return ret;
	}
	
	/**
	 * Removes a subscription.
	 * @param s The subscription to remove.
	 */
	public void unsubscribe(Subscription s) {
		System.out.println("Unsubscribing " + s.subscriber + " from " + s.messageType.toString());
		synchronized (subscriptions) {
			this.subscriptions.remove(s);
		}
	}
	
	/**
	 * Causes a message to be published to appropriate subscribers
	 * @param m the message to publish
	 */
	public void publish(Message msg) {
		System.out.println("Publishing" + msg.toString());
		synchronized (subscriptions) {
			Iterator<subscription> it = subscriptions.iterator();
			
			while (it.hasNext()) {
				Subscription sub = it.next();
				if (sub.test(msg)) {
					sub.subscriber.receiveMessage(msg);
				}
			}
		}
	}
	
	/**
	 * Cancels all subscriptions belonging to a particular subscriber. If this
	 * is used, the subscriber does not have to keep up with subscriptions.
	 * @param sub Subscriber to remove
	 */
	public void cancelAllSubscriptions(Subscriber sub) {
		System.out.println("Unsubscribing " + sub.toString());
		ArrayList<subscription> toRemove = new ArrayList<>();
		
		synchronized(this.subscriptions) {
			Iterator<subscription> it = subscriptions.iterator();
			
			while (it.hasNext()) {
				Subscription s = it.next();
				if (s.subscriber == sub) toRemove.add(s);
			}
			this.subscriptions.removeAll(toRemove);
		}
	}
}


Что я уже пробовал:

Я пробовал удалять синхронизированные блоки в методах модификации списка; я консультировался здесь, Документация Oracle по синхронизированным коллекциям и даже добавление изменчивой переменной 'busy', которая зависает в потоке до тех пор, пока busy == false, а затем устанавливает busy в true перед обработкой и в false после завершения (это просто привело к зависанию одного из моих потоков, и в любом случае это не очень хорошее решение). Я поиграл с идеей какой-то очереди задач, но прежде чем я перейду к этой проблеме, я подумал, что спрошу сообщество, есть ли что-то очевидное, что я упускаю.

Заранее благодарю вас за вашу помощь.

Richard MacCutchan

Весь ваш код выглядит Хорошо, но на самом деле я не могу это проверить. Я могу только предложить вам использовать отладчик, чтобы попытаться захватить больше информации при возникновении исключения.

Henrik Jonsson

Проверьте трассировку стека, чтобы увидеть, попадаете ли вы в вызов удаления или добавления во время итерации из одного и того же потока.

kjburns1980

Спасибо вам за помощь. Хенрик, проблема возникла на лямбде в очереди задач (путь кода довольно прост, и я не смог найти ничего подозрительного), так что вполне возможно, что может быть проблема с очередью задач.

Я нашел CopyOnWriteArrayList, и это хорошо, говорят они, только для ситуаций, когда итерация составляет 90% активности в списке. Для этой цели она составляет около 65%, с чем я пока могу жить. Кроме того, если проблема находится в очереди задач, это, по крайней мере, позаботится о том, чтобы я мог двигаться дальше.

Спасибо вам обоим за ответы.

0 Ответов