import { NgZone } from '@angular/core';

import { Observable, Observer, Subscription } from 'rxjs';
import { async } from 'rxjs/scheduler/async';
import { publishReplay, refCount } from 'rxjs/operators';

// TODO: Reuse this other places where we poll
// TODO: Eventually this should use WebSocket when the backend supports it
export function streamify<T, R = T>(
    work: (...args: any[]) => Promise<T>,
    zone: NgZone,
    frequency = 5,
    fallback?: R
): Observable<T | R> {
    const source: Observable<T> = Observable.create((observer: Observer<T>) => {
        let isUnsubscribed = false;
        let subscription: Subscription | null;

        // Schedule an update every 5s (or whatever `frequency` is).
        // NOTE: Every next call is deferred until the previous has been completed.
        const schedule = (immediate: boolean = false): void => {
            const w = frequency || 5;
            subscription = async.schedule(
                async () => {
                    let result: any;
                    try {
                        result = await work();
                    } catch (err) {
                        // If we had a failed request,
                        // send the fallback instead of failing the pipeline.
                        result = fallback || null;
                    }

                    zone.run(() => {
                        observer.next(result);
                    });

                    // Stop making further requests if unsubscribed.
                    if (isUnsubscribed) {
                        return;
                    }

                    // Schedule a new update.
                    zone.runOutsideAngular(() => {
                        schedule();
                    });
                },
                immediate ? 0 : w * 1000
            );
        };

        // Initiate stream
        zone.runOutsideAngular(() => {
            schedule(true);
        });

        return (): void => {
            isUnsubscribed = true;
            // Clear the current scheduled work.
            if (subscription) {
                subscription.unsubscribe();
                subscription = null;
            }
        };
    });
    return publishReplay<T>()(source).pipe(refCount());
}
