package org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.Json;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.JsonRpcResult;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.SubscribeRequest;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.SubscriptionType;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.UnsubscribeRequest;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.response.SubscriptionResponse;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;

/* loaded from: input_file:org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/SubscriptionManager.class */
public class SubscriptionManager extends AbstractVerticle {
    private static final Logger LOG = LogManager.getLogger();
    public static final String EVENTBUS_REMOVE_SUBSCRIPTIONS_ADDRESS = "SubscriptionManager::removeSubscriptions";
    private final AtomicLong subscriptionCounter = new AtomicLong(0);
    private final Map<Long, Subscription> subscriptions = new ConcurrentHashMap();
    private final SubscriptionBuilder subscriptionBuilder = new SubscriptionBuilder();
    private final LabelledMetric<Counter> subscribeCounter;
    private final LabelledMetric<Counter> unsubscribeCounter;

    public SubscriptionManager(MetricsSystem metricsSystem) {
        this.subscribeCounter = metricsSystem.createLabelledCounter(BesuMetricCategory.RPC, "subscription_subscribe_total", "Total number of subscriptions", "type");
        this.unsubscribeCounter = metricsSystem.createLabelledCounter(BesuMetricCategory.RPC, "subscription_unsubscribe_total", "Total number of unsubscriptions", "type");
    }

    public void start() {
        this.vertx.eventBus().consumer(EVENTBUS_REMOVE_SUBSCRIPTIONS_ADDRESS, this::removeSubscriptions);
    }

    public Long subscribe(SubscribeRequest subscribeRequest) {
        LOG.debug("Subscribe request {}", subscribeRequest);
        this.subscribeCounter.labels(subscribeRequest.getSubscriptionType().getCode()).inc();
        Subscription build = this.subscriptionBuilder.build(this.subscriptionCounter.incrementAndGet(), subscribeRequest.getConnectionId(), subscribeRequest);
        this.subscriptions.put(build.getSubscriptionId(), build);
        return build.getSubscriptionId();
    }

    public boolean unsubscribe(UnsubscribeRequest unsubscribeRequest) {
        Long subscriptionId = unsubscribeRequest.getSubscriptionId();
        String connectionId = unsubscribeRequest.getConnectionId();
        LOG.debug("Unsubscribe request subscriptionId = {}", subscriptionId);
        Subscription subscription = this.subscriptions.get(subscriptionId);
        if (subscription == null || !subscription.getConnectionId().equals(connectionId)) {
            throw new SubscriptionNotFoundException(subscriptionId);
        }
        destroySubscription(subscriptionId.longValue());
        return true;
    }

    private void destroySubscription(long j) {
        Subscription remove = this.subscriptions.remove(Long.valueOf(j));
        if (remove != null) {
            this.unsubscribeCounter.labels(remove.getSubscriptionType().getCode()).inc();
        }
    }

    private void removeSubscriptions(Message<String> message) {
        String str = (String) message.body();
        if (str == null || "".equals(str)) {
            LOG.warn("Received invalid connectionId ({}). No subscriptions removed.", str);
        }
        LOG.debug("Removing subscription for connectionId {}", str);
        this.subscriptions.values().stream().filter(subscription -> {
            return subscription.getConnectionId().equals(str);
        }).forEach(subscription2 -> {
            destroySubscription(subscription2.getSubscriptionId().longValue());
        });
    }

    public Subscription getSubscriptionById(Long l) {
        return this.subscriptions.get(l);
    }

    public <T> List<T> subscriptionsOfType(SubscriptionType subscriptionType, Class<T> cls) {
        return (List) this.subscriptions.values().stream().filter(subscription -> {
            return subscription.isType(subscriptionType);
        }).map(this.subscriptionBuilder.mapToSubscriptionClass(cls)).collect(Collectors.toList());
    }

    public void sendMessage(Long l, JsonRpcResult jsonRpcResult) {
        SubscriptionResponse subscriptionResponse = new SubscriptionResponse(l.longValue(), jsonRpcResult);
        Subscription subscription = this.subscriptions.get(l);
        if (subscription != null) {
            this.vertx.eventBus().send(subscription.getConnectionId(), Json.encode(subscriptionResponse));
        }
    }

    public <T> void notifySubscribersOnWorkerThread(SubscriptionType subscriptionType, Class<T> cls, Consumer<List<T>> consumer) {
        this.vertx.executeBlocking(promise -> {
            consumer.accept(subscriptionsOfType(subscriptionType, cls));
            promise.complete();
        }, asyncResult -> {
            if (asyncResult.failed()) {
                LOG.error("Failed to notify subscribers.", asyncResult.cause());
            }
        });
    }
}
