import { ApolloLink, Observable } from '@apollo/client';
import type { FetchResult } from '@apollo/client';
import { lazyExecuteRequest, lazyUnsubscribe } from 'owa-data-worker-client';
import { isDataWorkerPlatformEnabled } from 'owa-data-worker-check';
import { getIsDataWorkerFaulted } from 'owa-config';
import { sanitizeVariables } from 'owa-apollo-links';
import { getOperationAST } from 'graphql';
import type { Subscription } from 'zen-observable-ts';
import { PerformanceDatapoint } from 'owa-analytics';

type WorkerLinkResult = Observable<
    FetchResult<Record<string, any>, Record<string, any>, Record<string, any>>
>;

let requestCounter = 0;
let isInitialQuery: boolean = true;

export default function createDataWorkerLink() {
    const link = new ApolloLink((operation, forward) => {
        let result: WorkerLinkResult;
        let dispatchedToWorker = false;

        if (
            !isDataWorkerPlatformEnabled() ||
            getIsDataWorkerFaulted() ||
            operation.getContext()?.gatewayGraphRequest
        ) {
            // forward to the rest of the main renderer links
            result = forward(operation);
        } else {
            dispatchedToWorker = true;

            // try to run the request on the worker.
            result = new Observable(outer => {
                const requestId = requestCounter++;
                let fallbackSubscription: Subscription;

                const observer = {
                    next: outer.next?.bind(outer),
                    complete: () => {
                        if (fallbackSubscription) {
                            // if we had an error and issued a fallback request, then
                            // the fallback subscription owns the communication channel to the outer observer.
                            // the callback executing here is from the original request that faulted and its
                            // registration to outer is now obsolete, so do nothing
                        } else {
                            // otherwise, complete normally
                            outer.complete?.();
                        }
                    },
                    error: (e: any) => {
                        if (getIsDataWorkerFaulted()) {
                            // if the request fails because the worker didn't initialize, run it on main and bind
                            // the subscription directly to the outer observer, which will receive future callbacks
                            // from this fallback subscription (but only have to do this once)
                            if (!fallbackSubscription) {
                                fallbackSubscription = forward(operation)?.subscribe(outer);
                            }
                        } else {
                            // otherwise, treat it like a normal error
                            outer.error?.(e);
                        }
                    },
                };

                // sanitize/copy the variables, because things like mobx variables do not clone well over to the worker
                sanitizeVariables(operation);

                // execute on the worker
                lazyExecuteRequest.importAndExecute(requestId, operation, observer);

                // observables return an unsubscribe callback
                return () => {
                    lazyUnsubscribe.importAndExecute(requestId).catch(() => {
                        /* ignore errors */
                    });
                    fallbackSubscription?.unsubscribe();
                };
            });
        }

        // if this is the initial query to the worker, add timings
        if (isInitialQuery) {
            const opNode = getOperationAST(operation.query);
            if (opNode?.operation === 'query') {
                const queryName = opNode?.name?.value || 'unnamed';
                if (queryName !== 'bootstrap') {
                    result = addInitialQueryTimings(result, dispatchedToWorker, queryName);
                    isInitialQuery = false;
                }
            }
        }

        return result;
    });

    return link;
}

function addInitialQueryTimings(result: WorkerLinkResult, onWorker: boolean, queryName: string) {
    const inner = result;
    const dp = new PerformanceDatapoint('DataWorker_FirstQuery');
    dp.addCustomData({ onWorker, queryName });
    dp.addToCustomWaterfall(1, 'LinkStart');
    result = new Observable(outer => {
        const sub = inner.subscribe({
            next: outer.next?.bind(outer),
            error: innerError => {
                dp.addToCustomWaterfall(3, 'LinkError');
                outer?.error(innerError);
                dp.end();
            },
            complete: () => {
                dp.addToCustomWaterfall(2, 'LinkComplete');
                outer?.complete();
                dp.addCustomData({ sessionNow: performance.now() });
                dp.end();
            },
        });

        return () => sub.unsubscribe();
    });

    return result;
}
