Мне нужна помощь в решении проблемы параллелизма с использованием синхронизированных блоков
Я написал систему издатель-подписчик для многопоточного проекта, над которым работаю. Насколько я понимаю, при использовании SynchronizedList структурные операции должны быть потокобезопасными, но итерация требует, чтобы список был завернут в
synchronized
блок.По этим причинам я не могу понять, почему я получаю здесь исключение comod. Похоже, я принял все необходимые меры предосторожности, чтобы предотвратить это: все итерации находятся в синхронизированных блоках, а сам список является закрытым, поэтому к нему нельзя получить доступ извне класса.
Соответствующий код приведен ниже:
/** * Intermediary which receives messages and distributes them to subscribers * that have expressed interest in those messages. * @author Kevin J. Burns * */ public class Publisher { private List<subscription> subscriptions = Collections.synchronizedList(new ArrayList<>()); private static final Publisher instance = new Publisher(); private Publisher() { // Does nothing special except prevent instantiation. } /** * Registers a subscription to a subset of messages. * @param sub An object that can receive a message when sent * @param type RTTI info for the message type of interest. * @param filter A filter to use to determine whether a future message * matches this subscription * @return a reference to the subscription, in case subscribers wish to * keep track of their subscriptions */ public Subscription subscribe(Subscriber sub, Class type, Predicate<message> filter) { /* * Implementation note: * While it would be good to make the generic clause more specific * (thereby reducing type-checking code in filters), I'm not sure how * to instantiate generics from RTTI info. */ Subscription ret = new Subscription(); ret.subscriber = sub; ret.messageType = type; ret.filter = filter; System.out.println("Subscribing " + sub + " to " + type.toString()); synchronized (this.subscriptions) { this.subscriptions.add(ret); } return ret; } /** * Removes a subscription. * @param s The subscription to remove. */ public void unsubscribe(Subscription s) { System.out.println("Unsubscribing " + s.subscriber + " from " + s.messageType.toString()); synchronized (subscriptions) { this.subscriptions.remove(s); } } /** * Causes a message to be published to appropriate subscribers * @param m the message to publish */ public void publish(Message msg) { System.out.println("Publishing" + msg.toString()); synchronized (subscriptions) { Iterator<subscription> it = subscriptions.iterator(); while (it.hasNext()) { Subscription sub = it.next(); if (sub.test(msg)) { sub.subscriber.receiveMessage(msg); } } } } /** * Cancels all subscriptions belonging to a particular subscriber. If this * is used, the subscriber does not have to keep up with subscriptions. * @param sub Subscriber to remove */ public void cancelAllSubscriptions(Subscriber sub) { System.out.println("Unsubscribing " + sub.toString()); ArrayList<subscription> toRemove = new ArrayList<>(); synchronized(this.subscriptions) { Iterator<subscription> it = subscriptions.iterator(); while (it.hasNext()) { Subscription s = it.next(); if (s.subscriber == sub) toRemove.add(s); } this.subscriptions.removeAll(toRemove); } } }
Что я уже пробовал:
Я пробовал удалять синхронизированные блоки в методах модификации списка; я консультировался здесь, Документация Oracle по синхронизированным коллекциям и даже добавление изменчивой переменной 'busy', которая зависает в потоке до тех пор, пока busy == false, а затем устанавливает busy в true перед обработкой и в false после завершения (это просто привело к зависанию одного из моих потоков, и в любом случае это не очень хорошее решение). Я поиграл с идеей какой-то очереди задач, но прежде чем я перейду к этой проблеме, я подумал, что спрошу сообщество, есть ли что-то очевидное, что я упускаю.
Заранее благодарю вас за вашу помощь.
Richard MacCutchan
Весь ваш код выглядит Хорошо, но на самом деле я не могу это проверить. Я могу только предложить вам использовать отладчик, чтобы попытаться захватить больше информации при возникновении исключения.
Henrik Jonsson
Проверьте трассировку стека, чтобы увидеть, попадаете ли вы в вызов удаления или добавления во время итерации из одного и того же потока.
kjburns1980
Спасибо вам за помощь. Хенрик, проблема возникла на лямбде в очереди задач (путь кода довольно прост, и я не смог найти ничего подозрительного), так что вполне возможно, что может быть проблема с очередью задач.
Я нашел CopyOnWriteArrayList, и это хорошо, говорят они, только для ситуаций, когда итерация составляет 90% активности в списке. Для этой цели она составляет около 65%, с чем я пока могу жить. Кроме того, если проблема находится в очереди задач, это, по крайней мере, позаботится о том, чтобы я мог двигаться дальше.
Спасибо вам обоим за ответы.