import {csvDataProducer, ParseResult} from './parser/CsvParser';
import type {ProcessMessage} from './types/ProcessMessage';
import {Subject, EMPTY, of} from 'rxjs';
import {bufferTime, catchError, mergeMap, scan, map} from 'rxjs/operators';
import {postToServer} from './http/Request';

type Options = {
  [key: string]: number | string | undefined;
  /**
   *  Time to buffer before sending in milliseconds
   */
  bufferTime?: number;
  /**
   * max size per batch
   */
  batchSize?: number;
  /**
   * max concurrency for posting data
   */
  maxConcurrency?: number;
  /**
   * The timeout of batch request in milliseconds
   */
  timeout?: number;
  /**
   * The maximum data could be queued in memory
   */
  maxQueueSize?: number;
  /**
   * The type of the data. e.g. UPLOAD, PC_UPLOAD.
   */
  dataType?: string;

  dataSourceId: string;
};
const defaultOptions: Options = {
  dataSourceId: '',
  bufferTime: 500,
  batchSize: 500,
  timeout: 10_000,
  maxQueueSize: 100_000, // 100K * 0.5K/records = 50M
  maxConcurrency: 2,
  dataType: 'UPLOAD',
};

type ProcessArgs = {
  file: File;
  taskId: string;
  msgHandle: (msg: ProcessMessage) => void;
  options: Options;
};

type AbortHandle = () => void;

type ProcessMultiPipe = {
  /**
   * Submit a new file process task
   */
  submit: (args: ProcessArgs) => void;
  /**
   * Abort a procesing task. id is the `ProcessArgs.taskId` used when submit the task.
   */
  abort: (id: string) => void;
};

type ProcessTask = {
  args: ProcessArgs;
  aborted: boolean;
  abortHandle?: AbortHandle;
};

// Don't use large ConcurrentFile number to better control the total concurrent HTTP requests.
export default function creatProcessMultiPipe(maxConcurrentFile:number = 1): ProcessMultiPipe {
  const fileSubject = new Subject<ProcessArgs>();
  const processTasks = new Map<string, ProcessTask>();
  fileSubject
    .pipe(
      map((args) => {
        const task = {args, aborted: false} as ProcessTask;
        processTasks.set(args.taskId, task);
        return task;
      }),
      mergeMap(async (task) => {
        if (task.aborted) {
          return task;
        }
        await new Promise<boolean>((resolve) => {
          task.abortHandle = startProcess(task.args, resolve);
        }).then((value) => (task.aborted = value));
        return task;
      }, maxConcurrentFile),
      map((task) => {
        processTasks.delete(task.args.taskId);
        if (!task.aborted) {
          // send complete message if task not aborted.
          task.args.msgHandle({type: 'completed', data: {}});
        }
      }),
    )
    .subscribe();
  return {
    submit: (args) => fileSubject.next(args),
    abort: (id) => {
      const task = processTasks.get(id);
      if (task) {
        task.aborted = true;
        task.abortHandle?.call(undefined);
        task.args.msgHandle({type: 'error', data: {msg: 'User Aborted'}});
      }
    },
  };
}

function mergeOptions(option: Options, defaultOpt: Options): Options {
  let opt = defaultOpt;
  Object.keys(option).forEach((key) => {
    if (option[key]) {
      opt[key] = option[key];
    }
  });
  return opt;
}

export function startProcess(
  {file, msgHandle, options = defaultOptions}: ProcessArgs,
  signalComplete: (val: any) => void,
): AbortHandle {
  const opts = mergeOptions(options, defaultOptions);
  let dataSubject = new Subject<ParseResult>();
  const {dataSourceId, dataType} = opts;
  let parseError = 0;
  let totalRowsParsed = 0;
  let aborted = false;
  let parserProducer = csvDataProducer(
    file,
    (data: ParseResult) => dataSubject.next(data),
    (msg: string) => dataSubject.error(msg),
  );
  dataSubject
    .pipe(
      mergeMap((result: ParseResult) => {
        if (result.data) {
          totalRowsParsed++;
          return of(result.data);
        }
        if (result.isError) {
          parseError++;
        }
        if (result.isComplete) {
          dataSubject.complete();
        }
        return EMPTY;
      }),
      bufferTime(opts.bufferTime!, undefined, opts.batchSize!),
      // Stop Buffering if no more events.
      mergeMap((events) => {
        return events.length > 0 ? of(events) : EMPTY;
      }),
      mergeMap(async (events) => {
        if (aborted) {
          // skip processing inflight messages.
          return {size: 0, succeeded: false};
        }
        let succeeded = await postToServer(
          events as Array<object>,
          dataSourceId,
          opts.timeout!,
          dataType!,
        );
        parserProducer.produce(events.length);
        return {size: events.length, succeeded};
      }, opts.maxConcurrency),
      scan(
        ({success, error}, data) => {
          const deltaSuccess = data.succeeded ? data.size : 0;
          const deltaError = data.succeeded ? 0 : data.size;
          success += deltaSuccess;
          error += deltaError;
          let total = success + error;
          // TODO: Figure out a more accurate way to calculate percent.
          let percent = (total * 100.0) / totalRowsParsed;
          let totalError = deltaError + parseError; //post errors + parse errors.
          parseError = 0;
          msgHandle({
            type: 'progress',
            data: {success: deltaSuccess, error: totalError, percent},
          });
          return {success, error};
        },
        {success: 0, error: 0},
      ),
      catchError(async (err) => {
        msgHandle({type: 'error', data: {msg: err}});
        aborted = true;
        return err;
      }),
    )
    .subscribe({
      complete: () => {
        signalComplete(aborted);
      },
    });
  // start Initial produce
  parserProducer.produce(opts.maxQueueSize!);
  return () => {
    aborted = true;
    dataSubject.complete();
    parserProducer.cancel();
    signalComplete(aborted);
  };
}
