import { Injectable, NgZone } from '@angular/core';
import { rpcFilter, SupabaseService } from "./supabase.service";
import { BusinessService } from "./business.service";
import { finalize, shareReplay, switchMap, take } from "rxjs/operators";
import { Observable } from "rxjs";
import { Workflow } from "../models/workflow.model";
import { RealtimeChannel } from "@supabase/supabase-js";
import { frontToServerTranslation, serverToFrontTranslation } from 'projects/common/src/lib/services/supabase.service';
import { UsersService } from './users.service';

@Injectable({
  providedIn: 'root'
})
export class WorkflowService {

    private _observables: { [id: string]: Observable<Workflow | null> } = {};

    constructor(
        private businessService: BusinessService,
        private supabaseService: SupabaseService,
        private usersService: UsersService,
        private ngZone: NgZone
    ) {}

    workflowObservable(id: number, prefix = '') {
        if(!this._observables[id]) {
            this._observables[id] = this.businessService.selectedBusiness$.pipe(
                switchMap(business => {
                    let channel: RealtimeChannel;
                    return new Observable<Workflow | null>(subscriber => {
                        this.supabaseService.supabase
                            .schema(business.businessId)
                            .from('workflow')
                            .select()
                            .eq('id', id)
                            .single()
                            .then(res => {
                                if(res.error) {
                                    subscriber.next(null);
                                } else {
                                    subscriber.next(serverToFrontTranslation(res.data as Workflow))
                                }
                            });
                        channel = this.supabaseService.supabase
                            .channel(prefix + '_workflow_'+id)
                            .on(
                                'postgres_changes',
                                {event: '*', schema: business.businessId, table: 'workflow', filter: rpcFilter('id', 'eq', id)},
                                (changes) => {
                                    switch(changes.eventType) {
                                        case 'INSERT':
                                        case 'UPDATE':
                                            subscriber.next(serverToFrontTranslation(changes.new as Workflow));
                                            break;
                                        case 'DELETE':
                                            subscriber.next(null);
                                            break;
                                    }
                                }
                            );
                        channel.subscribe();
                    }).pipe(
                        finalize(() => {
                            if (channel)
                                this.supabaseService.supabase.removeChannel(channel);
                            delete this._observables[id];
                        })
                    );

                }),
                shareReplay({ bufferSize: 1, refCount: true })
            );
        }
        return this._observables[id];
    }

    workflowUsersIdsObservable(id: number) {
        let channel: RealtimeChannel | null = null;
        return new Observable<number[]>(observer => {
            let ids: number[] = [];

            const getDataAndSetObserver = async () => {
                const businessId = (await this.businessService.selectedBusiness$.pipe(take(1)).toPromise()).businessId;
                const res = await this.supabaseService.supabase.schema(businessId).rpc('get_workflow_users', frontToServerTranslation({ inId: id }));
                if (res.error) {
                    console.warn(res.error);
                    return;
                }
                ids = res.data as number[];
                this.ngZone.run(() => observer.next(ids));
            };

            const addIdAndSetObserver = (id: number) => {
                if (ids.includes(id))
                    return;
                const newIds = [...ids, id];
                this.ngZone.run(() => observer.next(newIds));
                ids = newIds;
            }

            const subscribeToChanges = async () => {
                const businessId = (await this.businessService.selectedBusiness$.pipe(take(1)).toPromise()).businessId;
                const currentUserId = (await this.usersService.currentUser$.pipe(take(1)).toPromise()).id;
                channel = this.supabaseService.supabase
                    .channel(`workflow_users_${currentUserId}`)
                    .on(
                        'postgres_changes',
                        { event: 'INSERT', schema: businessId, table: 'workflow_user', filter: `workflow_id=eq.${id}` },
                        async (payload) => addIdAndSetObserver(serverToFrontTranslation(payload.new).userId)
                    )
                    .on(
                        'postgres_changes',
                        { event: 'DELETE', schema: businessId, table: 'workflow_user' },
                        async () => await getDataAndSetObserver()
                    )
                    .on(
                        'postgres_changes',
                        { event: 'UPDATE', schema: businessId, table: 'job_user', filter: `workflow_id=eq.${id}` },
                        async (payload) => {
                            const data = serverToFrontTranslation(payload.new);
                            const userId = data.assigneeUserId;
                            if (['accepted', 'looped_in'].includes(data.acceptanceStatus))
                                addIdAndSetObserver(userId);
                        }
                    )
                    .on(
                        'postgres_changes',
                        { event: 'DELETE', schema: businessId, table: 'job_user' },
                        async () => await getDataAndSetObserver()
                    )
                    .on(
                        'postgres_changes',
                        { event: 'UPDATE', schema: businessId, table: 'estimate', filter: `workflow_id=eq.${id}` },
                        async () => await getDataAndSetObserver()
                    )
                    .subscribe();
            }

            getDataAndSetObserver().then(() => subscribeToChanges());
        })
        .pipe(
            finalize(() => {
                if (channel)
                    this.supabaseService.supabase.removeChannel(channel);
            }),
            shareReplay(1),
        );
    }

    async updateJobType(workflowId: number, jobType: string) {
        const business = await this.businessService.selectedBusiness$.pipe(take(1)).toPromise();
        return this.supabaseService.update(business.businessId, 'workflow', workflowId, { jobType });
    }

}
