package com.ibm.ws.xs.continuousquery.client.impl;

import com.ibm.ejs.ras.Tr;
import com.ibm.ejs.ras.TraceComponent;
import com.ibm.websphere.objectgrid.ObjectMap;
import com.ibm.websphere.objectgrid.OutputFormat;
import com.ibm.websphere.objectgrid.Session;
import com.ibm.websphere.objectgrid.SessionHandle;
import com.ibm.websphere.objectgrid.UndefinedMapException;
import com.ibm.websphere.objectgrid.continuousquery.ContinuousQueryCache;
import com.ibm.websphere.objectgrid.continuousquery.ContinuousQueryFilter;
import com.ibm.websphere.objectgrid.continuousquery.ContinuousQueryListener;
import com.ibm.websphere.objectgrid.continuousquery.ContinuousQueryNotificationEvent;
import com.ibm.websphere.objectgrid.continuousquery.ContinuousQueryTopic;
import com.ibm.websphere.objectgrid.continuousquery.filter.AbstractCQFilter;
import com.ibm.ws.ffdc.FFDCFilter;
import com.ibm.ws.objectgrid.Constants;
import com.ibm.ws.objectgrid.ObjectGridImpl;
import com.ibm.ws.objectgrid.SessionHandleImpl;
import com.ibm.ws.objectgrid.continuousquery.ContinuousQueryManagerImpl;
import com.ibm.ws.objectgrid.thread.ThreadPoolManagerFactory;
import com.ibm.ws.util.ThreadPool;
import com.ibm.ws.xs.continuousquery.agent.ContinuousQueryAgent;
import com.ibm.ws.xs.pubsub.subscription.Subscriber;
import com.ibm.ws.xs.pubsub.subscription.SubscriberLifecycle;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/ibm/ws/xs/continuousquery/client/impl/ContinuousQueryTopicImpl.class */
public class ContinuousQueryTopicImpl<KeyType, ValueType> implements ContinuousQueryTopic<KeyType, ValueType>, SubscriberLifecycle {
    private String mapName;
    private String topicName;
    private ContinuousQueryFilter filter;
    private ContinuousQueryCacheImpl<KeyType, ValueType> cqCache;
    private ContinuousQueryManagerImpl cqManager;
    private boolean keysOnly;
    private boolean returnInitialResultSet;
    private boolean notifyOnUpdated;
    private OutputFormat outputFormat;
    private List<ContinuousQuerySubscriber> subscribers;
    private ConcurrentLinkedQueue<ContinuousQueryListener<KeyType, ValueType>> listeners;
    private BlockingQueue<CQListenerNotification<KeyType, ValueType>> notifyQueue;
    private ContinuousQuerySubscriptionManager subMan;
    private String mapSetName;
    private List<Integer> partitions;
    private static final TraceComponent tc = Tr.register(ContinuousQueryCacheImpl.class, Constants.TR_CONTINUOUSQUERY_GROUP_NAME, "com.ibm.ws.objectgrid.resources.ObjectGridMessages");
    private static BlockingQueue<ListenerNotifier> runnableList = null;
    private static ThreadPool CQLThreadPool = null;
    private static ListenerNotifierQueueManager runnableManager = null;
    private int messageVersion = 1;
    private ListenerNotifier<KeyType, ValueType> listenerNotifier = null;
    private boolean listenerNotifierRunning = false;
    private Integer listenerMutex = new Integer(935);

    /* loaded from: input_file:com/ibm/ws/xs/continuousquery/client/impl/ContinuousQueryTopicImpl$CQListenerNotification.class */
    public static class CQListenerNotification<KeyType, ValueType> implements ContinuousQueryNotificationEvent<KeyType, ValueType> {
        private KeyType key;
        private ValueType value;
        private ContinuousQueryNotificationEvent.Reason reason;
        private int partitionID;
        boolean keysOnly;

        public CQListenerNotification(KeyType keytype, ValueType valuetype, ContinuousQueryNotificationEvent.Reason reason, int i, boolean z) {
            this.key = keytype;
            this.value = valuetype;
            this.reason = reason;
            this.partitionID = i;
            this.keysOnly = z;
        }

        @Override // com.ibm.websphere.objectgrid.continuousquery.ContinuousQueryNotificationEvent
        public KeyType getKey() {
            return this.key;
        }

        @Override // com.ibm.websphere.objectgrid.continuousquery.ContinuousQueryNotificationEvent
        public ValueType getValue() {
            return this.value;
        }

        @Override // com.ibm.websphere.objectgrid.continuousquery.ContinuousQueryNotificationEvent
        public ContinuousQueryNotificationEvent.Reason getReason() {
            return this.reason;
        }

        @Override // com.ibm.websphere.objectgrid.continuousquery.ContinuousQueryNotificationEvent
        public boolean isKeysOnly() {
            return this.keysOnly;
        }

        @Override // com.ibm.websphere.objectgrid.continuousquery.ContinuousQueryNotificationEvent
        public SessionHandle getSessionHandle() {
            return new SessionHandleImpl(this.partitionID);
        }
    }

    /* loaded from: input_file:com/ibm/ws/xs/continuousquery/client/impl/ContinuousQueryTopicImpl$ListenerNotifier.class */
    public static class ListenerNotifier<KeyType, ValueType> implements Runnable {
        ContinuousQueryTopicImpl<KeyType, ValueType> topic;
        public static int LOOP_SIZE = 50;
        public static int QUEUE_POLL_TIME = 100;
        private long userStart = 0;
        private long userEnd = 0;
        private long userCumulative = 0;
        private long totalStart = 0;
        private long totalEnd = 0;
        private boolean stop = false;
        private Object mutex;

        public ListenerNotifier(ContinuousQueryTopicImpl<KeyType, ValueType> continuousQueryTopicImpl, Object obj) {
            this.topic = null;
            this.mutex = null;
            this.topic = continuousQueryTopicImpl;
            this.mutex = obj;
        }

        public void stop() {
            this.stop = true;
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z = TraceComponent.isAnyTracingEnabled() && ContinuousQueryTopicImpl.tc.isDebugEnabled();
            boolean z2 = false;
            int i = 0;
            if (z) {
                this.totalStart = System.nanoTime();
                this.userCumulative = 0L;
            }
            BlockingQueue<CQListenerNotification<KeyType, ValueType>> notifyQueue = this.topic.getNotifyQueue();
            for (int i2 = 0; i2 < LOOP_SIZE; i2++) {
                if (this.stop) {
                    if (z) {
                        Tr.debug(ContinuousQueryTopicImpl.tc, "exiting at top of loop");
                        return;
                    }
                    return;
                }
                ConcurrentLinkedQueue<ContinuousQueryListener<KeyType, ValueType>> listeners = this.topic.getListeners();
                CQListenerNotification<KeyType, ValueType> cQListenerNotification = null;
                try {
                    cQListenerNotification = notifyQueue.poll(QUEUE_POLL_TIME, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                }
                if (cQListenerNotification != null) {
                    if (z) {
                        i++;
                    }
                    Iterator<ContinuousQueryListener<KeyType, ValueType>> it = listeners.iterator();
                    while (it.hasNext()) {
                        ContinuousQueryListener<KeyType, ValueType> next = it.next();
                        if (z) {
                            this.userStart = System.nanoTime();
                        }
                        try {
                            next.cacheUpdated(cQListenerNotification);
                        } catch (Throwable th) {
                            FFDCFilter.processException(th, getClass().getName(), "584");
                            if (z) {
                                Tr.debug(ContinuousQueryTopicImpl.tc, "ListenerNotifier - caught user exception", th);
                            }
                        }
                        if (z) {
                            this.userEnd = System.nanoTime();
                            this.userCumulative += this.userEnd - this.userStart;
                            z2 = true;
                        }
                    }
                }
            }
            if (z && z2) {
                this.totalEnd = System.nanoTime();
                Tr.debug(ContinuousQueryTopicImpl.tc, "Total time for " + i + " notifications to " + this.topic.getListeners().size() + " listeners is " + (this.totalEnd - this.totalStart) + "ns of which " + this.userCumulative + "ns is user time");
            }
            ListenerNotifier<KeyType, ValueType> listenerNotifier = new ListenerNotifier<>(this.topic, this.mutex);
            synchronized (this.mutex) {
                if (!this.stop) {
                    this.topic.setListenerNotifier(listenerNotifier);
                }
            }
            if (!this.stop) {
                this.topic.addListenerNotifierToRun(listenerNotifier);
            } else if (z) {
                Tr.debug(ContinuousQueryTopicImpl.tc, "exiting at bottom");
            }
        }
    }

    /* loaded from: input_file:com/ibm/ws/xs/continuousquery/client/impl/ContinuousQueryTopicImpl$ListenerNotifierQueueManager.class */
    public static class ListenerNotifierQueueManager implements Runnable {
        ContinuousQueryTopicImpl topic;
        private boolean stop = false;

        public ListenerNotifierQueueManager(ContinuousQueryTopicImpl continuousQueryTopicImpl) {
            this.topic = null;
            this.topic = continuousQueryTopicImpl;
        }

        public void stop() {
            this.stop = true;
        }

        @Override // java.lang.Runnable
        public void run() {
            ThreadPool listenerThreadPool = this.topic.getListenerThreadPool();
            BlockingQueue<ListenerNotifier> lNQueue = this.topic.getLNQueue();
            while (true) {
                try {
                    listenerThreadPool.execute(lNQueue.take());
                } catch (IllegalStateException e) {
                    FFDCFilter.processException(e, getClass().getName(), "577", this);
                } catch (InterruptedException e2) {
                    FFDCFilter.processException(e2, getClass().getName(), "579", this);
                }
                if (this.stop && lNQueue.isEmpty()) {
                    return;
                }
            }
        }
    }

    public ContinuousQueryTopicImpl(String str, ContinuousQueryFilter continuousQueryFilter, ContinuousQueryManagerImpl continuousQueryManagerImpl, String str2, boolean z, boolean z2, boolean z3, String str3, Collection<ContinuousQueryListener<KeyType, ValueType>> collection, boolean z4, OutputFormat outputFormat, List<Integer> list) {
        this.keysOnly = true;
        this.returnInitialResultSet = false;
        this.notifyOnUpdated = false;
        if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
            Tr.debug(tc, "<init> - CQTopicImpl", new Object[]{str, continuousQueryFilter, continuousQueryManagerImpl});
        }
        this.mapName = str;
        this.filter = continuousQueryFilter;
        this.topicName = str2;
        this.keysOnly = z;
        this.returnInitialResultSet = z2;
        this.notifyOnUpdated = z3;
        this.outputFormat = outputFormat;
        this.mapSetName = str3;
        this.partitions = list;
        this.notifyQueue = new LinkedBlockingQueue();
        this.listeners = new ConcurrentLinkedQueue<>();
        if (collection != null) {
            Iterator<ContinuousQueryListener<KeyType, ValueType>> it = collection.iterator();
            while (it.hasNext()) {
                addListener(it.next());
            }
        }
        this.cqManager = continuousQueryManagerImpl;
        this.cqCache = new ContinuousQueryCacheImpl<>(z, z4);
        this.subscribers = new CopyOnWriteArrayList();
        this.subMan = new ContinuousQuerySubscriptionManager(this.cqManager.getObjectGrid(), str3, str, str2, this.cqCache, this, list);
        this.subMan.init(this);
    }

    public synchronized void publisherMoved(ContinuousQuerySubscriber continuousQuerySubscriber) {
        if (tc.isEntryEnabled()) {
            Tr.entry(tc, "publisherMoved - " + continuousQuerySubscriber.getTopicName());
        }
        ContinuousQueryAgent continuousQueryAgent = new ContinuousQueryAgent(this.mapName, this.topicName, (AbstractCQFilter) this.filter, this.keysOnly, this.returnInitialResultSet, this.notifyOnUpdated, this.messageVersion);
        Boolean bool = false;
        int i = 0;
        while (!bool.booleanValue() && i < 30) {
            try {
                Session session = this.cqManager.getObjectGrid().getSession();
                session.setTransactionTimeout(30);
                ObjectMap map = session.getMap(this.mapName);
                session.setSessionHandle(new SessionHandleImpl(continuousQuerySubscriber.getPartitionID()));
                Object callReduceAgent = map.getAgentManager().callReduceAgent(continuousQueryAgent);
                if (callReduceAgent instanceof Boolean) {
                    bool = (Boolean) callReduceAgent;
                }
                session.setSessionHandle(null);
            } catch (Throwable th) {
                if (tc.isDebugEnabled()) {
                    Tr.debug(tc, "runAgentToPartition failed for topic: " + this.topicName);
                }
                FFDCFilter.processException(th, getClass().getName(), "216", this);
                bool = false;
            }
            i++;
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
            }
        }
        continuousQuerySubscriber.subscribe();
        if (tc.isEntryEnabled()) {
            Tr.exit(tc, "publisherMoved - " + continuousQuerySubscriber.getTopicName());
        }
    }

    public boolean runAgentToPartition(int i) throws UndefinedMapException {
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            Tr.entry(tc, "runAgentToPartition for topic " + this.topicName);
        }
        ((ObjectGridImpl) this.cqManager.getObjectGrid()).getMap(this.mapName, null, false, true);
        ContinuousQueryAgent continuousQueryAgent = new ContinuousQueryAgent(this.mapName, this.topicName, (AbstractCQFilter) this.filter, this.keysOnly, this.returnInitialResultSet, this.notifyOnUpdated, this.messageVersion);
        Boolean bool = false;
        try {
            Session session = this.cqManager.getObjectGrid().getSession();
            session.setTransactionTimeout(30);
            ObjectMap map = session.getMap(this.mapName);
            session.setSessionHandle(new SessionHandleImpl(i));
            Object callReduceAgent = map.getAgentManager().callReduceAgent(continuousQueryAgent);
            if (callReduceAgent instanceof Boolean) {
                bool = (Boolean) callReduceAgent;
            }
            session.setSessionHandle(null);
        } catch (Throwable th) {
            if (tc.isDebugEnabled()) {
                Tr.debug(tc, "runAgentToPartition failed for topic: " + this.topicName);
            }
            FFDCFilter.processException(th, getClass().getName(), "216", this);
            bool = false;
        }
        boolean booleanValue = bool.booleanValue();
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            Tr.exit(tc, "runAgentToPartition", Boolean.valueOf(booleanValue));
        }
        return booleanValue;
    }

    @Override // com.ibm.websphere.objectgrid.continuousquery.ContinuousQueryTopic
    public void addListener(ContinuousQueryListener<KeyType, ValueType> continuousQueryListener) {
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            Tr.entry(tc, "addListener", continuousQueryListener);
        }
        synchronized (ContinuousQueryTopicImpl.class) {
            if (CQLThreadPool == null) {
                CQLThreadPool = ThreadPoolManagerFactory.getThreadPoolManager().getThreadPool("ContinuousQueryListenerThreadPool", 1, 15);
            }
            if (runnableList == null) {
                runnableList = new LinkedBlockingQueue();
            }
            if (runnableManager == null) {
                runnableManager = new ListenerNotifierQueueManager(this);
                try {
                    CQLThreadPool.execute(runnableManager);
                } catch (IllegalStateException e) {
                    FFDCFilter.processException(e, getClass().getName(), "240", this);
                } catch (InterruptedException e2) {
                    FFDCFilter.processException(e2, getClass().getName(), "242", this);
                }
            }
        }
        if (continuousQueryListener == null) {
            if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
                Tr.exit(tc, "addListener");
                return;
            }
            return;
        }
        String str = null;
        synchronized (this.listenerMutex) {
            if (this.listeners.contains(continuousQueryListener)) {
                this.listeners.remove(continuousQueryListener);
                if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
                    str = "existing listener";
                }
            } else if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
                str = "new listener";
            }
            this.listeners.add(continuousQueryListener);
            if (this.listenerNotifier == null) {
                this.listenerNotifier = new ListenerNotifier<>(this, this.listenerMutex);
                str = str + " and new ListenerNotifier";
            }
            if (!this.listenerNotifierRunning) {
                addListenerNotifierToRun(this.listenerNotifier);
                this.listenerNotifierRunning = true;
            }
        }
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            Tr.exit(tc, "addListener", str);
        }
    }

    @Override // com.ibm.websphere.objectgrid.continuousquery.ContinuousQueryTopic
    public boolean removeListener(ContinuousQueryListener<KeyType, ValueType> continuousQueryListener) {
        boolean remove;
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            Tr.entry(tc, "removeListener", continuousQueryListener);
        }
        synchronized (this.listenerMutex) {
            remove = this.listeners.remove(continuousQueryListener);
            if (this.listeners.isEmpty()) {
                if (this.listenerNotifierRunning) {
                    this.listenerNotifier.stop();
                    this.listenerNotifierRunning = false;
                }
                this.listenerNotifier = null;
            }
        }
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            Tr.exit(tc, "removeListener", Boolean.valueOf(remove));
        }
        return remove;
    }

    public boolean hasListeners() {
        return !this.listeners.isEmpty();
    }

    @Override // com.ibm.websphere.objectgrid.continuousquery.ContinuousQueryTopic
    public void removeAllListeners() {
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            Tr.entry(tc, "removeAllListeners");
        }
        synchronized (this.listenerMutex) {
            this.listeners.clear();
            if (this.listenerNotifierRunning) {
                this.listenerNotifier.stop();
                this.listenerNotifierRunning = false;
            }
            this.listenerNotifier = null;
        }
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            Tr.exit(tc, "removeAllListeners");
        }
    }

    @Override // com.ibm.websphere.objectgrid.continuousquery.ContinuousQueryTopic
    public Collection<ContinuousQueryListener<KeyType, ValueType>> getAllListeners() {
        LinkedList linkedList;
        synchronized (this.listenerMutex) {
            linkedList = new LinkedList(this.listeners);
        }
        return linkedList;
    }

    @Override // com.ibm.websphere.objectgrid.continuousquery.ContinuousQueryTopic
    public ContinuousQueryCache<KeyType, ValueType> getCache() {
        return this.cqCache;
    }

    public String getMapName() {
        return this.mapName;
    }

    public ContinuousQueryFilter getFilter() {
        return this.filter;
    }

    @Override // com.ibm.websphere.objectgrid.continuousquery.ContinuousQueryTopic
    public String getName() {
        return this.topicName;
    }

    @Override // com.ibm.websphere.objectgrid.continuousquery.ContinuousQueryTopic
    public boolean isKeysOnlyCache() {
        return this.keysOnly;
    }

    @Override // com.ibm.websphere.objectgrid.continuousquery.ContinuousQueryTopic
    public boolean noCache() {
        return this.cqCache.noCache();
    }

    public String toString() {
        return this.topicName;
    }

    public void addNotification(CQListenerNotification<KeyType, ValueType> cQListenerNotification) {
        if (((CQListenerNotification) cQListenerNotification).reason != ContinuousQueryNotificationEvent.Reason.UPDATED || this.notifyOnUpdated) {
            boolean z = false;
            while (!z) {
                z = this.notifyQueue.offer(cQListenerNotification);
                if (!z) {
                    try {
                        Thread.sleep(500L);
                    } catch (InterruptedException e) {
                    }
                }
            }
        }
    }

    BlockingQueue<CQListenerNotification<KeyType, ValueType>> getNotifyQueue() {
        return this.notifyQueue;
    }

    ConcurrentLinkedQueue<ContinuousQueryListener<KeyType, ValueType>> getListeners() {
        return this.listeners;
    }

    ThreadPool getListenerThreadPool() {
        return CQLThreadPool;
    }

    BlockingQueue<ListenerNotifier> getLNQueue() {
        return runnableList;
    }

    void setListenerNotifier(ListenerNotifier<KeyType, ValueType> listenerNotifier) {
        this.listenerNotifier = listenerNotifier;
    }

    public void destroy() {
        this.subMan.destroy();
        synchronized (this.listenerMutex) {
            Iterator<ContinuousQueryListener<KeyType, ValueType>> it = getListeners().iterator();
            while (it.hasNext()) {
                removeListener(it.next());
            }
        }
        Iterator<ContinuousQuerySubscriber> it2 = this.subscribers.iterator();
        while (it2.hasNext()) {
            it2.next().cancelSubscription();
        }
    }

    void addListenerNotifierToRun(ListenerNotifier<KeyType, ValueType> listenerNotifier) {
        if (listenerNotifier != null) {
            runnableList.offer(listenerNotifier);
        }
    }

    public boolean allSubscribersAccepted() {
        if (tc.isEntryEnabled()) {
            Tr.entry(tc, "allSubscribersAccepted");
        }
        boolean z = false;
        for (ContinuousQuerySubscriber continuousQuerySubscriber : this.subscribers) {
            z = continuousQuerySubscriber.isSubscriptionAccepted();
            if (tc.isDebugEnabled()) {
                Tr.debug(tc, "Subscriber " + continuousQuerySubscriber + " subscribed=" + z);
            }
            if (!z) {
                break;
            }
        }
        if (tc.isEntryEnabled()) {
            Tr.exit(tc, "allSubscribersAccepted: " + z);
        }
        return z;
    }

    public int numberSubscribers() {
        return this.subscribers.size();
    }

    public List<ContinuousQuerySubscriber> subscribers() {
        return this.subscribers;
    }

    @Override // com.ibm.ws.xs.pubsub.subscription.SubscriberLifecycle
    public void addSubscriber(Subscriber subscriber) {
        if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
            Tr.debug(tc, "addSubscriber", new Object[]{subscriber, Integer.valueOf(this.subscribers.size())});
        }
        this.subscribers.add((ContinuousQuerySubscriber) subscriber);
    }

    @Override // com.ibm.ws.xs.pubsub.subscription.SubscriberLifecycle
    public Subscriber removeSubscriber(int i) {
        if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
            Tr.debug(tc, "removeSubscriber", new Object[]{Integer.valueOf(i), Integer.valueOf(this.subscribers.size())});
        }
        ContinuousQuerySubscriber continuousQuerySubscriber = null;
        Iterator<ContinuousQuerySubscriber> it = this.subscribers.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            ContinuousQuerySubscriber next = it.next();
            if (next.getPartitionID() == i) {
                continuousQuerySubscriber = next;
                break;
            }
        }
        boolean z = false;
        if (continuousQuerySubscriber != null) {
            z = this.subscribers.remove(continuousQuerySubscriber);
        }
        if (z) {
            return continuousQuerySubscriber;
        }
        return null;
    }

    @Override // com.ibm.websphere.objectgrid.continuousquery.ContinuousQueryTopic
    public List<Integer> getPartitions() {
        return this.partitions;
    }

    @Override // com.ibm.websphere.objectgrid.continuousquery.ContinuousQueryTopic
    public OutputFormat getOutputFormat() {
        return this.outputFormat;
    }
}
