/* eslint-disable import/no-deprecated,max-lines */

import { datadogLogs } from "@datadog/browser-logs";
import {
  filter,
  groupBy,
  map,
  mapValues,
  pick,
  pipe,
  prop,
  toPairs,
  uniqBy,
  values,
} from "remeda";
import {
  canonicalRelationIds,
  canonicalTableIdentifier,
} from "../../utils/graph/canonical";
import { isEqualTables } from "../../utils/graph/lineage";
import type {
  EdgeState,
  LineageColumn,
  LineageColumnId,
  LineageState,
  LineageTable,
} from "../../utils/graph/type";

// There's a limit to the size of the datadog logging payload, we need to
// truncate our objects before logging them.
const MAX_REPORTED_MISSING_COLUMNS = 1500;

// Add any prop here that you'd want to be logged to DD that isn't related directly to
// the column's ID (that is already logged), this should be data that makes it possible
// to identify this specific entry in the raw snapshot.
const COLUMN_METADATA_PROPS: readonly (keyof LineageColumn)[] = ["type"];

export function normalizeSnapshot({
  tables,
  edges,
  ...rest
}: LineageState): LineageState {
  const nonEmpty = filterNonEmpty(tables);

  const deduppedCols = removeDuplicateColumns(nonEmpty);

  const tableById = createTablesMapper(deduppedCols);

  const outputTables = Object.values(tableById);

  const deduppedEdges = dedupEdges(edges);

  const noLoopEdges = deduppedEdges.filter(
    ({
      sourceColumn: { table_identifier: srcId },
      destinationColumn: { table_identifier: dstId },
    }) => !isEqualTables(srcId, dstId),
  );

  const outputEdges = filterConnectableEdges(noLoopEdges, tableById);

  datadogLogs.logger.info("Prepared data for rendering", {
    tablesBefore: tables.length,
    tablesAfter: outputTables.length,
    edgesBefore: edges.length,
    esgesAfter: outputEdges.length,
  });

  return {
    tables: outputTables,
    edges: outputEdges,
    ...rest,
  };
}

export function removeIndirectChanges({
  edges,
  tables,
  ...rest
}: LineageState): LineageState {
  const outputEdges = edges.filter(({ changeStatus }) => changeStatus !== undefined);

  const columnsFromEdges = new Set(
    outputEdges.flatMap(
      ({
        sourceColumn: { table_identifier: srcId },
        destinationColumn: { table_identifier: dstId },
      }) => [canonicalTableIdentifier(srcId), canonicalTableIdentifier(dstId)],
    ),
  );
  const outputTables = tables.filter(
    (t) =>
      columnsFromEdges.has(canonicalTableIdentifier(t.table_identifier)) ||
      t.changeStatus !== undefined ||
      t.columns.some((c) => c.changeStatus !== undefined),
  );
  return {
    edges: outputEdges,
    tables: outputTables,
    ...rest,
  };
}

function filterNonEmpty(tables: readonly LineageTable[]): readonly LineageTable[] {
  const nonEmptyTables = tables.filter(({ columns }) => columns.length > 0);
  if (nonEmptyTables.length < tables.length) {
    const emptyTables = tables.filter((table) => !nonEmptyTables.includes(table));
    datadogLogs.logger.warn("Found empty tables in lineage", {
      count: emptyTables.length,
      emptyTables: summarizeTables(emptyTables),
    });
  }

  return nonEmptyTables;
}

function removeDuplicateColumns(
  tables: readonly LineageTable[],
): readonly LineageTable[] {
  const tablesWithDuplicates: LineageTable[] = [];

  const dedupped = tables.map((table) => {
    const columns = uniqBy(table.columns, prop("col_name"));
    if (table.columns.length === columns.length) {
      return table;
    }

    tablesWithDuplicates.push(table);
    // Create a new table object with the dedupped columns
    return { ...table, columns };
  });
  if (tablesWithDuplicates.length > 0) {
    datadogLogs.logger.warn("Found tables with duplicate columns in lineage", {
      count: tablesWithDuplicates.length,
      tablesWithDuplicates: summarizeTables(tablesWithDuplicates),
    });
  }

  return dedupped;
}

function createTablesMapper(tables: readonly LineageTable[]) {
  const tableById = groupBy.strict(tables, ({ table_identifier }) =>
    canonicalTableIdentifier(table_identifier),
  );
  const duplicateTableIds = pipe(
    tableById,
    toPairs.strict,
    filter(([, tablesForId]) => tablesForId.length > 1),
  );
  if (Object.keys(duplicateTableIds).length > 0) {
    datadogLogs.logger.warn("Found multiple tables with the same id", {
      count:
        Object.values(duplicateTableIds).flat().length -
        Object.keys(duplicateTableIds).length,
      duplicateTableIds,
    });
  }

  return mapValues(tableById, ([firstTable]) => firstTable);
}

function dedupEdges(edges: readonly EdgeState[]): readonly EdgeState[] {
  const groupedBy = groupBy.strict(edges, (rel) => canonicalRelationIds(rel).id);
  const dups = pipe(
    groupedBy,
    toPairs.strict,
    filter(([, rels]) => rels.length > 1),
  );
  if (Object.keys(dups).length > 0) {
    datadogLogs.logger.warn("Found multiple edges between the same columns", {
      count: Object.values(dups).flat().length - Object.keys(dups).length,
      duplicateRelationIds: pipe(
        dups,
        map(([, rels]) =>
          map(rels, ({ sourceColumn, destinationColumn, ...rest }) => ({
            sourceColumn: pick(sourceColumn, COLUMN_METADATA_PROPS),
            destinationColumn: pick(destinationColumn, COLUMN_METADATA_PROPS),
            ...rest,
          })),
        ),
      ),
    });
  }

  return pipe(
    groupedBy,
    values,
    map(([first, ...arr]) => arr.find((r) => r.changeStatus !== undefined) ?? first),
  );
}

const summarizeTables = (tables: readonly LineageTable[]) =>
  pipe(
    tables,
    groupBy.strict(({ table_identifier: { identifier } }) => identifier),
  );

function filterConnectableEdges(
  edges: readonly EdgeState[],
  tableById: Readonly<Record<string, LineageTable>>,
): readonly EdgeState[] {
  const haveSource = filterByEnd(
    edges,
    tableById,
    ({ sourceColumn }) => sourceColumn,
    "SOURCE",
  );

  return filterByEnd(
    haveSource,
    tableById,
    ({ destinationColumn }) => destinationColumn,
    "DESTINATION",
  );
}

// eslint-disable-next-line max-params
function filterByEnd(
  edges: readonly EdgeState[],
  tableById: Readonly<Record<string, LineageTable>>,
  endExtractor: (rel: EdgeState) => LineageColumnId,
  label: string,
): readonly EdgeState[] {
  const { good, missingTable, missingColumn } = groupBy.strict(edges, (rel) => {
    const testedCol = endExtractor(rel);

    const { [canonicalTableIdentifier(testedCol.table_identifier)]: table } = tableById;
    if (table === undefined) {
      return "missingTable";
    }

    if (rel.relationType === "relation") {
      return table.columns.some(({ col_name }) => col_name === testedCol.col_name)
        ? "good"
        : "missingColumn";
    }
    return "good";
  });

  if (missingTable !== undefined) {
    datadogLogs.logger.warn(`Found edges with a non-existent ${label} TABLE`, {
      count: missingTable.length,
      missingTable: missingTable.map((rel) => canonicalRelationIds(rel).id),
    });
  }

  if (missingColumn !== undefined) {
    datadogLogs.logger.warn(`Found edges with a non-existent ${label} COLUMN`, {
      count: missingColumn.length,
      missingColumn: missingColumn
        .slice(0, MAX_REPORTED_MISSING_COLUMNS)
        .map((rel) => canonicalRelationIds(rel).id),
      isTruncated: missingColumn.length > MAX_REPORTED_MISSING_COLUMNS,
    });
  }

  return good ?? [];
}
