import {rpc} from '@pbe/runtime';
import {GrpcStatusCode} from './goog-grpc-status-code';
import {GRPCWebOptions} from './grpc-web.options';
import {JWTService} from '../jwt.service';
import {JWTInterceptor} from '../injectables/jwt-interceptor';
import {Inject, Injectable} from '@angular/core';
import {GRPC_WEB_TRANSPORT_OPTIONS} from './grpc-web.module';

export enum GrpcWebFrame {
    DATA = 0x00,
    TRAILER = 0x80,
}

function getBody(message: Uint8Array): Uint8Array {
    const l = new Uint8Array(5 + message.length);
    l[0] = GrpcWebFrame.DATA;
    new DataView(l.buffer).setUint32(1, message.length);
    l.set(message, 5);
    return l;
}

type HttpHeaders = {[key: string]: string | string[]};

@Injectable()
export class GRPCWebTransport extends rpc.Transport {
    retries = new Map<rpc.Request<any, any>, number>();

    constructor(@Inject(GRPC_WEB_TRANSPORT_OPTIONS) opts: GRPCWebOptions,
                                                    jwt: JWTService) {
        super();

        this.requests.subscribe((rpcRequest: rpc.Request<any, any>) => {
            if (!rpcRequest.method.reqStream) {
                const url = `/${(rpcRequest.service.constructor as any)._fqn}/${rpcRequest.method.name}`;
                rpcRequest.req.subscribe(msg => {
                    const reqHeaders = new Headers();
                    reqHeaders.set('Content-Type', 'application/grpc-web+proto');
                    reqHeaders.set('X-Grpc-Web', '1');
                    if (opts.deadline) {
                        const ts = typeof opts.deadline === 'number' ? opts.deadline : opts.deadline.getTime();
                        const timeout = ts - Date.now();
                        reqHeaders.set('grpc-timeout', `${timeout}m`);
                    }
                    const jwtToken = localStorage.getItem('jwt-token');
                    if (jwtToken) reqHeaders.set('Authorization', `JWT ${jwtToken}`);

                    globalThis.fetch(url, {method: 'POST', headers: reqHeaders, body: getBody(new rpcRequest.method.req(msg).toBinary()), signal: opts.abort ?? null})
                        .catch(err => rpcRequest.resp.error(err))
                        .then((res: Response) => {
                            if (res.status === 401) {
                                const retries = this.retries.get(rpcRequest) || this.retries.set(rpcRequest, 0) && 0;

                                if (retries > 2) {
                                    this.retries.delete(rpcRequest);
                                    return rpcRequest.resp.error(res);
                                }

                                this.retries.set(rpcRequest, retries + 1);
                                return JWTInterceptor.handleAuthError(jwt, res).subscribe(() => rpcRequest.req.next(msg));
                            }

                            if (!res.ok) return rpcRequest.resp.error(res);

                            rpcRequest.req.complete();

                            switch (res.type) {
                                case 'error':
                                case 'opaque':
                                case 'opaqueredirect':
                                    return rpcRequest.resp.error(GrpcStatusCode[GrpcStatusCode.UNKNOWN]);
                            }

                            const respHeaders: HttpHeaders = {};
                            res.headers.forEach((value, key) => {
                                const e = respHeaders[key];
                                if (typeof e === 'string') respHeaders[key] = [e, value];
                                else if (Array.isArray(e)) e.push(value);
                                else respHeaders[key] = value;
                            });

                            let code = GrpcStatusCode.OK, message: string | undefined;
                            const m = respHeaders['grpc-message'];
                            if (m !== undefined) {
                                if (Array.isArray(m)) return rpcRequest.resp.error(GrpcStatusCode.INTERNAL);
                                message = m;
                            }
                            const s = respHeaders['grpc-status'];
                            if (s !== undefined) {
                                if (Array.isArray(m) || GrpcStatusCode[code] === undefined) return rpcRequest.resp.error(GrpcStatusCode.INTERNAL);
                                code = parseInt(s as string);
                            }

                            if (code !== GrpcStatusCode.OK) return rpcRequest.resp.error(GrpcStatusCode[code]);
                            if (!res.body) return rpcRequest.resp.error(GrpcStatusCode[GrpcStatusCode.INTERNAL]);

                            this._readGrpcWebResponseBody(res.body, rpcRequest);
                        });
                });
            }
        });
    }

    private async _readGrpcWebResponseBody(stream: ReadableStream<Uint8Array>, rpcRequest: rpc.Request<any, any>) {
        let streamReader: {next(): Promise<ReadableStreamReadResult<Uint8Array>>},
            byteQueue: Uint8Array = new Uint8Array(0);

        if (typeof stream.getReader === 'function') {
            const whatWgReadableStream = stream.getReader();
            streamReader = {
                next: () => whatWgReadableStream.read(),
            };
        } else {
            streamReader = (stream as unknown as AsyncIterable<Uint8Array>)[Symbol.asyncIterator]() as any;
        }

        let result: ReadableStreamReadResult<Uint8Array>;
        while (!result?.done) {
            result = await streamReader.next();

            if (result.value !== undefined) {
                const n = new Uint8Array(byteQueue.length + result.value.length);
                n.set(byteQueue);
                n.set(result.value, byteQueue.length);
                byteQueue = n;

                while (byteQueue.length >= 5 && byteQueue[0] === GrpcWebFrame.DATA) {
                    let msgLen = 0;
                    for (let i = 1; i < 5; i++) msgLen = (msgLen << 8) + byteQueue[i];

                    if (byteQueue.length - 5 >= msgLen) {
                        rpcRequest.resp.next((rpcRequest.method.resp as any).fromBinary(byteQueue.subarray(5, 5 + msgLen)));
                        if (!rpcRequest.method.respStream) return rpcRequest.resp.complete();
                        byteQueue = byteQueue.subarray(5 + msgLen);
                    } else {
                        break;
                    }
                }
            }
        }

        return rpcRequest.resp.complete();
    }
}
