import { createPromise, haveSameElements, Publisher, timeout } from '@prospective/pms-js-utils'

export class StreamAbortError extends Error {
    constructor(message) {
        super(message)
        this.isAbortError = true
    }
}

const statusSymbol = Symbol('status')
const P = executor => {
    let resolveHandlers = []
    let rejectHandlers = []
    let result
    let reason
    let status = 'pending'
    const onResolve = value => {
        result = value
        status = 'fulfilled'
        resolveHandlers.forEach(resolveHandler => resolveHandler(value))
        resolveHandlers = []
    }
    const onReject = error => {
        reason = error
        status = 'rejected'
        if (rejectHandlers.length === 0) throw new Error('Unhandled rejection', { cause: error })
        rejectHandlers.forEach(rejectHandler => rejectHandler(error))
        rejectHandlers = []
    }

    const thenFunction = (onFulfilled, onRejected) => P((resolve, reject) => {
        const internalResolveHandler = result => {
            const getValue = async () => {
                const callback = typeof onFulfilled === 'function' ? onFulfilled : result => result
                return await callback(result)
            }
            getValue().then(resolve, reject)
        }
        const internalRejectHandler = error => {
            const getValue = async () => {
                const callback = typeof onRejected === 'function' ? onRejected : () => { throw error }
                return await callback(error)
            }
            getValue().then(resolve, reject)
        }
        resolveHandlers.push(internalResolveHandler)
        rejectHandlers.push(internalRejectHandler)
        if (status === 'fulfilled') internalResolveHandler(result)
        if (status === 'rejected') internalRejectHandler(reason)
    })

    const catchFunction = onRejected => P((resolve, reject) => {
        const internalRejectHandler = error => {
            const getValue = async () => {
                const callback = typeof onRejected === 'function' ? onRejected : () => { throw error }
                return await callback(error)
            }
            getValue().then(resolve, reject)
        }
        if (status === 'rejected') internalRejectHandler(reason)
        else rejectHandlers.push(internalRejectHandler)
    })

    executor(onResolve, onReject)
    return {
        then: thenFunction,
        catch: catchFunction,
        [statusSymbol]: status
    }
}
P.withResolvers = () => {
    let resolve
    let reject
    const p = P((onResolve, onReject) => {
        resolve = onResolve
        reject = onReject
    })
    return [p, resolve, reject]
}

/**
 * @typedef {
 *  {
 *      subscribe: function(callback: function),
 *      unsubscribe: function(callback: function),
 *  }
 * } Publisher
 */

/**
 * @typedef {
 *  { abort: function(reason): Promise } & Publisher & Promise
 * } StreamInstanceInterface
 */

/**
 * @typedef {
 * { compose: function(StreamDefinition): StreamDefinition }
 * & Publisher
 * & function(...arguments: any[]): StreamInstanceInterface } StreamDefinition
 */

/**
 * Creates a stream definition
 * @function
 * @param {AsyncGeneratorFunction} generatorFunction
 * @returns {StreamDefinition}
 */
export const Stream = generatorFunction => {
    const [onData, publishOnData] = Publisher()
    const stream = function() {
        const args = Array.from(arguments)
        const generator = generatorFunction(...args)
        let aborted = false
        const [p, resolve, reject] = P.withResolvers()
        let nextValuePromise
        const execute = async generator => {
            let result
            do {
                const nextResultPromise = generator.next()
                const [nextValue, resolveNextValue] = createPromise()
                nextValuePromise = nextValue
                result = await nextResultPromise
                resolveNextValue(result.value)
                if (!aborted)
                    publishOnData(result.value, result.done)
            }
            while (!result.done)
            return result.value
        }
        execute(generator).then(resolve, error => reject(error))
        p.abort = async reason => {
            aborted = true
            try {
                await generator.throw(new StreamAbortError(reason || 'Abort'))
            } catch (error) {
                reject(error)
            }
            return p
        }
        Reflect.defineProperty(p, 'nextValue', {
            get: () => nextValuePromise
        })
        p.subscribe = onData.subscribe
        p.unsubscribe = onData.unsubscribe
        p[Symbol.asyncIterator] = StreamIterator(p)
        p.reduce = (callback, initialValue) => Stream(async function* () {
            let accumulator = initialValue
            let iteration = 0
            for await (const value of p) {
                const nextAccumulator = callback(accumulator, value, iteration)
                yield nextAccumulator
                accumulator = nextAccumulator
                iteration += 1
            }
            const value = await p
            return callback(accumulator, value, iteration)
        })()
        p.toArray = async () => {
            const result = []
            for await (const value of p)
                result.push(value)
            return result
        }
        return p
    }
    stream.subscribe = onData.subscribe
    stream.unsubscribe = onData.unsubscribe
    stream.compose = component => component(stream)
    return stream
}

export const StreamIterator = stream => async function* () {
    let [next, resolveNext, rejectNext] = createPromise()
    let complete = false
    const onData = (data, done) => {
        const [promise, resolve, reject] = createPromise()
        resolveNext([ data, promise ])
        if (done) {
            complete = true
            return
        }
        next = promise
        resolveNext = resolve
        rejectNext = reject
    }
    stream.subscribe(onData)
    stream
        .then(result => {
            complete = true
            stream.unsubscribe(onData)
            resolveNext([result])
        })
        .catch(error => rejectNext(error))
    do {
        const [ value, nextValuePromise ] = await next
        if (complete) {
            stream.unsubscribe(onData)
            return value
        }
        yield value
        next = nextValuePromise
    } while (next)
    stream.unsubscribe(onData)
}

export const Observer = callback => stream => {
    stream.subscribe(callback)
    return stream
}

Observer.observeAll = (streams, callback) => {
    let streamsState = new Array(streams.length)
    streams.map(async (stream, index) => {
        const observer = state => {
            streamsState = streamsState.concat()
            streamsState[index] = state
            callback(streamsState)
        }
        stream.subscribe(observer)
        await stream
        stream.unsubscribe(observer)
    })
}

Stream.Observer = Observer
Stream.PENDING = 'PENDING'
Stream.FULFILLED = 'FULFILLED'
Stream.REJECTED = 'REJECTED'

Stream.resolve = data => Stream(async function* () { return data })
Stream.reject = reason => Stream(async function* () { throw reason })

export const Partial = transformFunction => stream => transformFunction(stream)

const abortableStreamEntries = new Map()
export const SelfAbortable = stream => Stream(async function*() {
    const args = Array.from(arguments)
    const streamInstance = abortableStreamEntries.get(stream)
    if (streamInstance) {
        try {
            await streamInstance.abort('Self-abort')
            await streamInstance
        } catch (error) {}
    }
    const newStreamInstance = stream(...args)
    abortableStreamEntries.set(stream, newStreamInstance)

    yield* newStreamInstance
    const result = await newStreamInstance
    abortableStreamEntries.delete(stream)
    return result
})

export const ReferenceComparator = (value1, value2) => value1 === value2
const ArrayComparator = (comparatorsArray = []) => (previousArguments, currentArguments) =>
    previousArguments.length === currentArguments.length &&
    previousArguments.every((value, index) => {
        const valueComparator = comparatorsArray.at(index) || ReferenceComparator
        return valueComparator(value, currentArguments[index])
    })

let memoizeEntries = new Map()

export const Memoizer = (comparator, size) => {
    const memoizer = (stream = undefined) => Stream(async function*() {
        const args = Array.from(arguments)
        const comparatorFunction = typeof comparator === 'function' ? comparator : ArrayComparator(comparator)
        if (!memoizeEntries.has(stream)) memoizeEntries.set(stream, [])
        const entries = memoizeEntries.get(stream)
        let entry = entries.find(entry => comparatorFunction(entry.arguments, args))
        if (entry) {
            yield* entry.streamInstance
            return await entry.streamInstance
        }

        const streamInstance = stream(...args)
        entry = {
            arguments: args,
            stream,
            streamInstance,
            memoizer
        }
        entries.push(entry)
        if (!isNaN(size) && entries.length > size)
            memoizeEntries.set(stream, entries.slice(1))

        try {
            yield* streamInstance
            return await streamInstance
        } catch (error) {
            throw error
        }
    })
    memoizer.invalidate = () => {
        memoizeEntries.forEach((entries, key) => {
            const newEntries = entries.filter(entry => entry.memoizer !== memoizer)
            memoizeEntries.set(key, newEntries)
        })
    }
    return memoizer
}

export const Memoized = Memoizer()
Memoized.getMemoizer = () => Memoizer()

export const MemoizedLast = Memoizer(undefined, 1)
MemoizedLast.getMemoizer = () => Memoizer(undefined, 1)

let debouncedEntries = new Map()

export const Debounced = (delay = 0) => stream => Stream(async function*() {
    const args = Array.from(arguments)
    const entry = debouncedEntries.get(stream)
    if (entry) entry.aborted = true
    if (entry?.streamInstance) {
        debouncedEntries.delete(stream)
        try {
            await entry.streamInstance.abort('Self-abort')
            await entry.streamInstance
        } catch (error) {}
    }

    const newEntry = { aborted: false }
    debouncedEntries.set(stream, newEntry)

    await timeout(delay)
    if (newEntry.aborted) {
        throw new StreamAbortError('Debounce')
    }

    const newStreamInstance = stream(...args)
    newEntry.streamInstance = newStreamInstance

    yield* newStreamInstance
    const result = await newStreamInstance
    debouncedEntries.delete(stream)
    return result
})

let sharedStreamEntries = []
/**
 * When you execute a Shared Stream while the previous execution is still pending,
 * it won't create a new Stream instance. Instead, it will return the pending one. Kinda Singleton.
 * @param stream
 * @returns {StreamDefinition}
 */
export const Shared = stream => Stream(async function*() {
    const args = Array.from(arguments)
    let entry = sharedStreamEntries.find(entry =>
        entry.stream === stream &&
        haveSameElements(entry.args, args)
    )
    if (!entry) {
        const streamInstance = stream(...args)
        entry = { stream, streamInstance, args }
        sharedStreamEntries.push(entry)
    }

    try {
        yield* entry.streamInstance
        const result = await entry.streamInstance
        sharedStreamEntries = sharedStreamEntries.filter(item => entry !== item)
        return result
    } catch (error) {
        sharedStreamEntries = sharedStreamEntries.filter(item => entry !== item)
        throw error
    }
})

export const Await = fn => stream => Stream(async function* () {
    const args = Array.from(arguments)
    await fn(...args)
    const originalStream = stream(...args)
    yield* originalStream
    return await originalStream
})

export const MapParameters = fn => stream => Stream(async function* () {
    const args = Array.from(arguments)
    const parameters = (await fn(...args) || [])
    try {
        const originalStream = stream(...parameters)
        yield* originalStream
        return await originalStream
    } catch (error) {
        throw error
    }
})

export const HandleError = errorHandler => stream => Stream(async function* (...args) {
    try {
        const originalStream = stream(...args)
        yield* originalStream
        return await originalStream
    } catch (error) {
        const result = errorHandler(error)
        return result
    }
})

Stream.fromPromises = ({
    all: Stream(async function* (promises) {
        let result = new Array(promises.length)
        let [next, resolveNext, rejectNext] = createPromise()
        let resolvedPromises = 0
        promises.forEach((promise, index) => promise
            .then(value => resolveNext([index, value]))
            .catch(error => rejectNext(error)))
        while (next && resolvedPromises < promises.length) {
            const [index, value] = await next
            const nextResult = [...result]
            nextResult[index] = value
            result = nextResult
            const nextPromise = createPromise()
            next = nextPromise.at(0)
            resolveNext = nextPromise.at(1)
            rejectNext = nextPromise.at(2)
            resolvedPromises++
            yield result
        }
        return result
    }),
    allSettled: Stream(async function* (promises) {
        let promiseSet = promises.map(promise => {
            promise
                .then(() => entry[1] = true)
                .catch(() => entry[1] = true)
            const entry = [promise, false]
            return entry
        })
        while (promiseSet.length) {
            try {
                const value = await Promise.race(promiseSet.map(entry => entry.at(0)))
                yield { value, status: 'fulfilled' }
            } catch (error) {
                yield { reason: error, status: 'rejected' }
            }
            promiseSet = promiseSet.filter(([, settled]) => !settled)
        }
    })
})

export const generatorFromPromises = promises => {
    const generatorFunction = async function*() {
        let result = []
        let promiseSet = promises.map(promise => {
            promise
                .then(() => entry[1] = true)
                .catch(() => entry[1] = true)
            const entry = [promise, false]
            return entry
        })
        while (promiseSet.length) {
            try {
                const value = await Promise.race(promiseSet.map(entry => entry.at(0)))
                yield { value, status: 'fulfilled' }
            } catch (error) {
                yield { reason: error, status: 'rejected' }
            }
            promiseSet = promiseSet.filter(([, settled]) => !settled)
        }
    }
    return generatorFunction
}
