File

projects/netgrif-components-core/src/lib/process/process.service.ts

Description

Process service is responsible for loading and caching processes needed for any functionality of an app.

Index

Properties
Methods
Accessors

Constructor

constructor(_petriNetResource: PetriNetResourceService, _log: LoggerService)
Parameters :
Name Type Optional
_petriNetResource PetriNetResourceService No
_log LoggerService No

Methods

Public areNetsLoaded
areNetsLoaded(identifiers: Array)
Parameters :
Name Type Optional
identifiers Array<string> No
Returns : boolean
Public getNet
getNet(identifier: string, forceLoad)

Get process net by identifier. (unless another is already pending)

Parameters :
Name Type Optional Default value Description
identifier string No

Identifier of the requested process. See {

forceLoad No false

when set to true cached processes will be ignored and a backend request will always be made (unless another is already pending)

Returns : Observable<Net>

Observable of [the process]{

Public getNetReference
getNetReference(identifier: string)

Get process net reference by identifier.

PetriNetReferences are not cached. Each call will result in a new backend request unless a request for the same net is already pending.

Parameters :
Name Type Optional Description
identifier string No

Identifier of the requested process. See {

Returns : Observable<PetriNetReferenceWithPermissions>

Observable of [the process]{

Public getNetReferences
getNetReferences(identifiers: Array)

Get process net referencess according to provided identifiers.

PetriNetReferences are not cached. Each call will result in a new backend request unless a request for the same net is already pending. If any of the processes failed to load it is skipped from the result.

Parameters :
Name Type Optional Description
identifiers Array<string> No

Array of identifiers of requested processes. See {

Returns : Observable<Array<PetriNetReferenceWithPermissions>>

Observable of array of loaded processes. Array is emitted only when every process finished loading. If any of the processes failed to load it is skipped from the result.

Public getNets
getNets(identifiers: Array, forceLoad)

Get process nets according to provided identifiers. If any of the requested processes is not cached it will be loaded from the server and saved for later. (unless another is already pending) If any of the processes failed to load it is skipped from the result.

Parameters :
Name Type Optional Default value Description
identifiers Array<string> No

Array of identifiers of requested processes. See {

forceLoad No false

when set to true cached processes will be ignored and a backend request will always be made (unless another is already pending)

Returns : Observable<Array<Net>>

Observable of array of loaded processes. Array is emitted only when every process finished loading. If any of the processes failed to load it is skipped from the result.

Public isNetLoaded
isNetLoaded(identifier: string)
Parameters :
Name Type Optional
identifier string No
Returns : boolean
Protected loadNet
loadNet(id: string)
Parameters :
Name Type Optional
id string No
Returns : Observable<Net>
Protected loadNetReference
loadNetReference(id: string)
Parameters :
Name Type Optional
id string No
Returns : Observable<PetriNetReference>
Protected loadRoles
loadRoles(id: string)
Parameters :
Name Type Optional
id string No
Returns : Observable<RolesAndPermissions>
Protected loadTransactions
loadTransactions(id: string)
Parameters :
Name Type Optional
id string No
Protected loadTransitions
loadTransitions(id: string)
Parameters :
Name Type Optional
id string No
ngOnDestroy
ngOnDestroy()
Returns : void
Protected publishUpdate
publishUpdate(net?: Net)
Parameters :
Name Type Optional
net Net Yes
Returns : void
Public removeNet
removeNet(identifier: string)

Remove cached process by identifier. If the process is not found nothing happens.

Parameters :
Name Type Optional Description
identifier string No

Process identifier

Returns : void
Public updateNet
updateNet(net: Net)

Update cached process object. If the process is not found nothing happens. Process object is replaced.

Parameters :
Name Type Optional Description
net Net No

Updated process object.

Returns : void

Properties

Protected Readonly _nets
Type : NetCache
Protected _netsSubject
Type : Subject<NetCache>
Protected _netUpdate
Type : Subject<Net>
Protected _referenceRequestCache
Type : Map<string | ReplaySubject<PetriNetReferenceWithPermissions>>
Protected _requestCache
Type : Map<string | ReplaySubject<Net>>
Public Readonly LATEST
Type : string
Default value : 'latest'

Accessors

nets$
getnets$()

Stream of change of the process cache. New state of cache is emitted every time the cached changed by inserting, updating or deleting a process.

netUpdate$
getnetUpdate$()

Stream of change in the process cache. New state of cache is emitted every time the cached changed by inserting, updating or deleting a process.

Returns : Observable<Net>
import {Injectable, OnDestroy} from '@angular/core';
import {forkJoin, Observable, of, ReplaySubject, Subject} from 'rxjs';
import {Net} from './net';
import {PetriNetResourceService} from '../resources/engine-endpoint/petri-net-resource.service';
import {LoggerService} from '../logger/services/logger.service';
import Transition from './transition';
import Transaction from './transaction';
import {catchError, map, switchMap, tap} from 'rxjs/operators';
import RolesAndPermissions from './rolesAndPermissions';
import {PetriNetReference} from '../resources/interface/petri-net-reference';
import {PetriNetReferenceWithPermissions} from './petri-net-reference-with-permissions';

export interface NetCache {
    [k: string]: Net;
}

/**
 * Process service is responsible for loading and caching processes needed for any functionality of an app.
 */
@Injectable({
    providedIn: 'root'
})
export class ProcessService implements OnDestroy {

    protected readonly _nets: NetCache;
    protected _netsSubject: Subject<NetCache>;
    protected _netUpdate: Subject<Net>;
    protected _requestCache: Map<string, ReplaySubject<Net>>;
    protected _referenceRequestCache: Map<string, ReplaySubject<PetriNetReferenceWithPermissions>>;
    public readonly LATEST = 'latest';

    constructor(private _petriNetResource: PetriNetResourceService, private _log: LoggerService) {
        this._nets = {};
        this._netsSubject = new Subject<NetCache>();
        this._netUpdate = new Subject<Net>();
        this._requestCache = new Map<string, ReplaySubject<Net>>();
        this._referenceRequestCache = new Map<string, ReplaySubject<PetriNetReferenceWithPermissions>>();
    }

    ngOnDestroy(): void {
        this._netsSubject.complete();
        this._netUpdate.complete();
        Array.from(this._requestCache.values()).forEach(net => net.complete());
        Array.from(this._referenceRequestCache.values()).forEach(net => net.complete());
    }

    /**
     * Get process nets according to provided identifiers.
     * If any of the requested processes is not cached it will be loaded from the server and saved for later.
     * @param identifiers Array of identifiers of requested processes. See {@link Net}
     * @param forceLoad when set to `true` cached processes will be ignored and a backend request will always be made
     * (unless another is already pending)
     * @returns Observable of array of loaded processes. Array is emitted only when every process finished loading.
     * If any of the processes failed to load it is skipped from the result.
     */
    public getNets(identifiers: Array<string>, forceLoad = false): Observable<Array<Net>> {
        if (identifiers.length === 0) {
            return of([]);
        }
        return forkJoin(identifiers.map(i => {
            return this.getNet(i, forceLoad);
        })).pipe(
            map(nets => nets.filter(n => !!n)),
            tap(nets => {
                if (nets.length === 0) {
                    return;
                }
                this._netsSubject.next(this._nets);
                nets.forEach(n => this._netUpdate.next(n));
            })
        );
    }

    /**
     * Get process net by identifier.
     * @param identifier Identifier of the requested process. See {@link Net}
     * @param forceLoad when set to `true` cached processes will be ignored and a backend request will always be made
     * (unless another is already pending)
     * @returns Observable of [the process]{@link Net}. Process is loaded from a server or picked from the cache.
     */
    public getNet(identifier: string, forceLoad = false): Observable<Net> {
        if (!forceLoad && this._nets[identifier]) {
            this._log.debug(`returning net '${identifier}' from cache`);
            return of(this._nets[identifier]);
        }
        if (this._requestCache.has(identifier)) {
            this._log.debug(`returning net '${identifier}' from pending requests`);
            return this._requestCache.get(identifier).asObservable();
        }
        this._log.debug(`retrieving net '${identifier}' from backend`);
        this._requestCache.set(identifier, new ReplaySubject<Net>(1));
        return this.loadNet(identifier).pipe(
            tap(net => {
                const s = this._requestCache.get(identifier);
                if (s) {
                    s.next(net);
                    s.complete();
                    this._requestCache.delete(identifier);
                }
                if (net) {
                    this.publishUpdate(net);
                }
            })
        );
    }

    /**
     * Get process net referencess according to provided identifiers.
     *
     * `PetriNetReferences` are not cached.
     * Each call will result in a new backend request unless a request for the same net is already pending.
     * @param identifiers Array of identifiers of requested processes. See {@link Net}
     * @returns Observable of array of loaded processes. Array is emitted only when every process finished loading.
     * If any of the processes failed to load it is skipped from the result.
     */
    public getNetReferences(identifiers: Array<string>): Observable<Array<PetriNetReferenceWithPermissions>> {
        if (identifiers.length === 0) {
            return of([]);
        }
        return forkJoin(identifiers.map(i => {
            return this.getNetReference(i);
        })).pipe(
            map(references => references.filter(r => !!r))
        );
    }

    /**
     * Get process net reference by identifier.
     *
     * `PetriNetReferences` are not cached.
     * Each call will result in a new backend request unless a request for the same net is already pending.
     * @param identifier Identifier of the requested process. See {@link Net}
     * @returns Observable of [the process]{@link Net}. Process is loaded from a server or picked from the cache.
     */
    public getNetReference(identifier: string): Observable<PetriNetReferenceWithPermissions> {
        if (this._referenceRequestCache.has(identifier)) {
            return this._referenceRequestCache.get(identifier).asObservable();
        }
        this._referenceRequestCache.set(identifier, new ReplaySubject<PetriNetReferenceWithPermissions>(1));
        return this.loadNetReference(identifier).pipe(
            switchMap(ref => {
                if (ref !== null) {
                    return forkJoin({net: of(ref), roles: this.loadRoles(ref.stringId)});
                } else {
                    return of({net: ref, roles: undefined});
                }
            }),
            map(result => {
                if (result.net === null) {
                    return null;
                }
                return {
                    ...result.net,
                    roles: result.roles.processRoles,
                    permissions: result.roles.permissions
                };
            }),
            tap(reference => {
                const s = this._referenceRequestCache.get(identifier);
                if (s) {
                    s.next(reference);
                    s.complete();
                    this._referenceRequestCache.delete(identifier);
                }
            })
        );
    }

    /**
     * Remove cached process by identifier. If the process is not found nothing happens.
     * @param identifier Process identifier
     */
    public removeNet(identifier: string): void {
        if (!this._nets[identifier]) {
            return;
        }
        delete this._nets[identifier];
        this.publishUpdate();
    }

    /**
     * Update cached process object. If the process is not found nothing happens. Process object is replaced.
     * @param net Updated process object.
     */
    public updateNet(net: Net): void {
        if (!this._nets[net.identifier]) {
            return;
        }
        if (!net.transitions.length || !net.transactions.length || !net.roles.length) {
            forkJoin({
                transitions: this.loadTransitions(net.stringId),
                transactions: this.loadTransactions(net.stringId),
                roles: this.loadRoles(net.stringId)
            }).subscribe(values => {
                net.transitions = values.transitions;
                net.transactions = values.transactions;
                net.roles = values.roles.processRoles;
                net.permissions = values.roles.permissions;
                this._nets[net.identifier] = net;
                this.publishUpdate(net);
            }, error => {
                this._log.error('Failed to load part of Petri net ' + net.title, error);
                // throw error;
            });
        } else {
            this._nets[net.identifier] = net;
            this.publishUpdate(net);
        }
    }

    /**
     * Stream of change of the process cache.
     * New state of cache is emitted every time the cached changed by inserting, updating or deleting a process.
     * @returns Observable of whole updated cache.
     */
    public get nets$(): Observable<NetCache> {
        return this._netsSubject.asObservable();
    }

    /**
     * Stream of change in the process cache.
     * New state of cache is emitted every time the cached changed by inserting, updating or deleting a process.
     * @returns Observable of updated or newly loaded process net.
     */
    public get netUpdate$(): Observable<Net> {
        return this._netUpdate.asObservable();
    }

    public areNetsLoaded(identifiers: Array<string>): boolean {
        return identifiers.every(identifier => this.isNetLoaded(identifier));
    }

    public isNetLoaded(identifier: string): boolean {
        return !!this._nets[identifier];
    }

    protected loadNet(id: string): Observable<Net> {
        const returnNet = new ReplaySubject<Net>(1);
        this.loadNetReference(id).subscribe(net => {
            if (net === null) {
                this._log.debug(`loadNetReference for net '${id}' returned null`);
                returnNet.next(null);
                returnNet.complete();
                return;
            }
            this._log.debug(`loading net '${id}' transitions, transactions and roles`);
            forkJoin({
                transitions: this.loadTransitions(net.stringId),
                transactions: this.loadTransactions(net.stringId),
                roles: this.loadRoles(net.stringId)
            }).subscribe(values => {
                this._nets[net.identifier] = new Net(net);
                this._nets[net.identifier].transitions = values.transitions;
                this._nets[net.identifier].transactions = values.transactions;
                this._nets[net.identifier].roles = values.roles.processRoles;
                this._nets[net.identifier].permissions = values.roles.permissions;
                returnNet.next(this._nets[net.identifier]);
                returnNet.complete();
            }, error => {
                this._log.error('Failed to load part of Petri net ' + net.title, error);
                returnNet.next(this._nets[net.identifier]);
                returnNet.complete();
                // throw error;
            });
        });
        return returnNet.asObservable();
    }

    protected loadNetReference(id: string): Observable<PetriNetReference> {
        const returnReference = new ReplaySubject<PetriNetReference>(1);
        this._petriNetResource.getOne(id, this.LATEST).subscribe(reference => {
            returnReference.next(!reference.stringId ? null : reference);
            returnReference.complete();
            return;
        }, error => {
            this._log.error('Failed to load Petri net', error);
            returnReference.next(null);
            returnReference.complete();
        });
        return returnReference.asObservable();
    }

    protected loadTransitions(id: string): Observable<Array<Transition>> {
        return this._petriNetResource.getPetriNetTransitions(id).pipe(
            map(trans => {
                if (trans instanceof Array) {
                    return trans;
                }
                return [];
            }),
            tap(trans => {
                if (trans.length === 0) {
                    this._log.info('References for transitions of net ' + id + ' were not found!');
                }
            }),
            catchError(err => {
                this._log.error('References for transitions of net ' + id + ' failed to load!', err);
                throw err;
            })
        );
    }

    protected loadTransactions(id: string): Observable<Array<Transaction>> {
        return this._petriNetResource.getPetriNetTransactions(id).pipe(
            map(trans => {
                if (trans instanceof Array) {
                    return trans;
                }
                return [];
            }),
            tap(trans => {
                if (trans.length === 0) {
                    this._log.info('References for transactions of net ' + id + ' were not found!');
                }
            }),
            catchError(err => {
                this._log.error('References for transactions of net ' + id + ' failed to load!', err);
                throw err;
            })
        );
    }

    protected loadRoles(id: string): Observable<RolesAndPermissions> {
        return this._petriNetResource.getPetriNetRoles(id).pipe(
            tap(rolesAndPerm => {
                if (rolesAndPerm.processRoles.length === 0) {
                    this._log.info('Roles reference of net ' + id + ' were not found!');
                }
            }),
            catchError(err => {
                this._log.error('Roles reference of net ' + id + ' failed to load!', err);
                throw err;
            })
        );
    }

    protected publishUpdate(net?: Net): void {
        this._netsSubject.next(this._nets);
        if (net) {
            this._netUpdate.next(net);
        }
    }
}

result-matching ""

    No results matching ""