import * as tslib_1 from "tslib";
import { Observable } from 'rxjs';
import { async } from 'rxjs/scheduler/async';
import { publishReplay, refCount } from 'rxjs/operators';
// TODO: Reuse this other places where we poll
// TODO: Eventually this should use WebSocket when the backend supports it
export function streamify(work, zone, frequency, fallback) {
    var _this = this;
    if (frequency === void 0) { frequency = 5; }
    var source = Observable.create(function (observer) {
        var isUnsubscribed = false;
        var subscription;
        // Schedule an update every 5s (or whatever `frequency` is).
        // NOTE: Every next call is deferred until the previous has been completed.
        var schedule = function (immediate) {
            if (immediate === void 0) { immediate = false; }
            var w = frequency || 5;
            subscription = async.schedule(function () { return tslib_1.__awaiter(_this, void 0, void 0, function () {
                var result, err_1;
                return tslib_1.__generator(this, function (_a) {
                    switch (_a.label) {
                        case 0:
                            _a.trys.push([0, 2, , 3]);
                            return [4 /*yield*/, work()];
                        case 1:
                            result = _a.sent();
                            return [3 /*break*/, 3];
                        case 2:
                            err_1 = _a.sent();
                            // If we had a failed request,
                            // send the fallback instead of failing the pipeline.
                            result = fallback || null;
                            return [3 /*break*/, 3];
                        case 3:
                            zone.run(function () {
                                observer.next(result);
                            });
                            // Stop making further requests if unsubscribed.
                            if (isUnsubscribed) {
                                return [2 /*return*/];
                            }
                            // Schedule a new update.
                            zone.runOutsideAngular(function () {
                                schedule();
                            });
                            return [2 /*return*/];
                    }
                });
            }); }, immediate ? 0 : w * 1000);
        };
        // Initiate stream
        zone.runOutsideAngular(function () {
            schedule(true);
        });
        return function () {
            isUnsubscribed = true;
            // Clear the current scheduled work.
            if (subscription) {
                subscription.unsubscribe();
                subscription = null;
            }
        };
    });
    return publishReplay()(source).pipe(refCount());
}
