/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.search.asynchronous.transport;

import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchTask;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.commons.authuser.User;
import org.opensearch.search.SearchService;
import org.opensearch.search.asynchronous.context.AsynchronousSearchContext;
import org.opensearch.search.asynchronous.context.active.AsynchronousSearchActiveContext;
import org.opensearch.search.asynchronous.listener.AsynchronousSearchProgressListener;
import org.opensearch.search.asynchronous.listener.AsynchronousSearchTimeoutWrapper;
import org.opensearch.search.asynchronous.listener.PrioritizedActionListener;
import org.opensearch.search.asynchronous.request.SubmitAsynchronousSearchRequest;
import org.opensearch.search.asynchronous.response.AsynchronousSearchResponse;
import org.opensearch.search.asynchronous.service.AsynchronousSearchService;
import org.opensearch.search.asynchronous.task.AsynchronousSearchTask;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskId;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

public class TransportSubmitAsynchronousSearchAction
extends HandledTransportAction<SubmitAsynchronousSearchRequest, AsynchronousSearchResponse> {
    private static final Logger logger = LogManager.getLogger(TransportSubmitAsynchronousSearchAction.class);
    private final ThreadPool threadPool;
    private final ClusterService clusterService;
    private final TransportSearchAction transportSearchAction;
    private final AsynchronousSearchService asynchronousSearchService;
    private final SearchService searchService;

    @Inject
    public TransportSubmitAsynchronousSearchAction(ThreadPool threadPool, TransportService transportService, ClusterService clusterService, ActionFilters actionFilters, AsynchronousSearchService asynchronousSearchService, TransportSearchAction transportSearchAction, SearchService searchService) {
        super("cluster:admin/opendistro/asynchronous_search/submit", transportService, actionFilters, SubmitAsynchronousSearchRequest::new);
        this.threadPool = threadPool;
        this.clusterService = clusterService;
        this.asynchronousSearchService = asynchronousSearchService;
        this.transportSearchAction = transportSearchAction;
        this.searchService = searchService;
    }

    protected void doExecute(Task task, final SubmitAsynchronousSearchRequest request, final ActionListener<AsynchronousSearchResponse> listener) {
        AsynchronousSearchContext asynchronousSearchContext = null;
        String userStr = (String)this.threadPool.getThreadContext().getTransient("_opendistro_security_user_info");
        User user = User.parse((String)userStr);
        try {
            long relativeStartTimeInMillis = this.threadPool.relativeTimeInMillis();
            asynchronousSearchContext = this.asynchronousSearchService.createAndStoreContext(request, relativeStartTimeInMillis, () -> this.searchService.aggReduceContextBuilder(request.getSearchRequest()), user);
            assert (asynchronousSearchContext.getAsynchronousSearchProgressListener() != null) : "missing progress listener for an active context";
            final AsynchronousSearchProgressListener progressListener = asynchronousSearchContext.getAsynchronousSearchProgressListener();
            final AsynchronousSearchContext context = asynchronousSearchContext;
            SearchRequest searchRequest = new SearchRequest(request.getSearchRequest()){

                public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
                    AsynchronousSearchTask asynchronousSearchTask = new AsynchronousSearchTask(id, type, "indices:data/read/opendistro/asynchronous_search", parentTaskId, headers, (AsynchronousSearchActiveContext)context, request, TransportSubmitAsynchronousSearchAction.this.asynchronousSearchService::onCancelledFreeActiveContext);
                    TransportSubmitAsynchronousSearchAction.this.asynchronousSearchService.bootstrapSearch(asynchronousSearchTask, context.getContextId());
                    PrioritizedActionListener wrappedListener = AsynchronousSearchTimeoutWrapper.wrapScheduledTimeout(TransportSubmitAsynchronousSearchAction.this.threadPool, request.getWaitForCompletionTimeout(), "opensearch_asynchronous_search_generic", listener, actionListener -> {
                        progressListener.searchProgressActionListener().removeListener((ActionListener<AsynchronousSearchResponse>)actionListener);
                        listener.onResponse((Object)context.getAsynchronousSearchResponse());
                    });
                    progressListener.searchProgressActionListener().addOrExecuteListener(wrappedListener);
                    return asynchronousSearchTask;
                }
            };
            searchRequest.setParentTask(task.taskInfo(this.clusterService.localNode().getId(), false).getTaskId());
            this.transportSearchAction.execute((ActionRequest)searchRequest, (ActionListener)progressListener);
        }
        catch (Exception e) {
            logger.error(() -> new ParameterizedMessage("Failed to submit asynchronous search request [{}]", (Object)request), (Throwable)e);
            if (asynchronousSearchContext != null) {
                AsynchronousSearchActiveContext asynchronousSearchActiveContext = (AsynchronousSearchActiveContext)asynchronousSearchContext;
                this.asynchronousSearchService.freeContext(asynchronousSearchActiveContext.getAsynchronousSearchId(), asynchronousSearchActiveContext.getContextId(), user, (ActionListener<Boolean>)ActionListener.wrap(r -> {
                    logger.debug(() -> new ParameterizedMessage("Successfully cleaned up context on submit asynchronous search [{}] on failure", (Object)asynchronousSearchActiveContext.getAsynchronousSearchId()), (Throwable)e);
                    listener.onFailure(e);
                }, ex -> {
                    logger.debug(() -> new ParameterizedMessage("Failed to cleaned up context on submit asynchronous search [{}] on failure", (Object)asynchronousSearchActiveContext.getAsynchronousSearchId()), (Throwable)ex);
                    listener.onFailure(e);
                }));
            }
            listener.onFailure(e);
        }
    }
}

