import { Injectable } from '@angular/core';
import { BehaviorSubject, fromEvent, interval, Observable, switchMap, take, tap } from "rxjs";
import { filter, map } from "rxjs/operators";
import { SessionStatus } from "../../openapi/portal/models/session-status";
import { SessionsService } from "../../openapi/portal/services/sessions.service";
import { Session } from "../../openapi/portal/models/session";
import { environment } from "../../environments/environment";

@Injectable({
  providedIn: 'root'
})
export class ConversationService {
  private loadingMessage = "Loading ..."
  private doneMessage = "done";
  progress$ = new BehaviorSubject<string>(this.loadingMessage);

  constructor(private sessionsService: SessionsService) {
  }

  pollSession(sessionId: string): Observable<Session> {
    return interval(1000).pipe(
      switchMap(() => this.sessionsService.getSession({session_id: sessionId})),
      filter((session) => session.status !== SessionStatus.Running),
      take(1),
    );
  }

  listenSession(sessionId: string): Observable<Session> {
    const eventSource = new EventSource(environment.rootApiUrl + '/sessions/' + sessionId + '/subscribe');

    return fromEvent<MessageEvent>(eventSource, 'message').pipe(
      map(event => JSON.parse(event.data)),
      tap(data => this.progress$.next(this.prepareProgress(data.progress))),
      filter(data => data.progress === this.doneMessage),
      switchMap(() => {
        eventSource.close();
        return this.sessionsService.getSession({session_id: sessionId});
      }),
    );
  }

  private prepareProgress(raw: string): string {
    if (raw === this.doneMessage) {
      return 'Done';
    }
    try {
      const parsed = JSON.parse(raw);
      let text = parsed['active_step'];
      if (parsed['functions']) {
        for (const f of parsed['functions']) {
          text += `<br/>-- ${f['function_name']}: ${f['active_step']}`;
        }
      }
      return text
    } catch (e) {
      return this.loadingMessage;
    }
  }
}
