Fixing wamo agent

This commit is contained in:
Martin Karkowski 2021-01-04 19:12:17 +01:00
parent 2b6d90ffdc
commit b1a2ca30a6
14 changed files with 899 additions and 109 deletions

View File

@ -2,7 +2,7 @@
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2020-11-11 13:27:58
* @modify date 2020-12-03 14:00:03
* @modify date 2021-01-04 10:08:29
* @desc [description]
*/
@ -186,5 +186,15 @@ export async function runNopeBackend() {
// If requested As Main => Perform the Operation.
if (require.main === module) {
// Subscribe to unhandled Reactions.
process.on("unhandledRejection", (reason, p) => {
console.log("Unhandled Rejection at: Promise", p, "reason:", reason);
console.error(reason);
// application specific logging, throwing an error, or other logic here
// Forward the error
throw reason;
});
runNopeBackend().catch(console.error);
}

View File

@ -2,7 +2,7 @@
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2020-10-12 18:52:00
* @modify date 2021-01-03 16:17:40
* @modify date 2021-01-04 13:19:55
* @desc [description]
*/
@ -1688,9 +1688,7 @@ export class nopeDispatcher implements INopeDispatcher {
(data: IExternalEventMsg) => {
// Test if the Content, which has been forwared in here inst the own dispathcer.
if (data.sender != _this.id) {
if (!observable.setContent(data.data, _this.id, data.timestamp)) {
observable.forcePublish();
}
observable.setContent(data.data, _this.id, data.timestamp);
}
},
{
@ -1714,10 +1712,7 @@ export class nopeDispatcher implements INopeDispatcher {
const observer = _externalSource.subscribe({
next(data: IExternalEventMsg) {
if (_this.id !== data.sender) {
// Externally force if required
if (!observable.setContent(data.data, _this.id, data.timestamp)) {
observable.forcePublish();
}
observable.setContent(data.data, _this.id, data.timestamp);
}
},
complete() {
@ -1750,11 +1745,11 @@ export class nopeDispatcher implements INopeDispatcher {
// First try to get current source-data (which in theory should be)
// Up-to-date.
// If this data isnt available try to use the lastly published data.
const _sourceData = _externalSource.getContent();
const _sourceData = this._lastPublishedEvent.get(_subTopic);
const data =
_sourceData !== null && _sourceData !== undefined
? _sourceData
: this._lastPublishedEvent.get(_subTopic);
: _externalSource.getContent();
if (data) {
if (!observable.setContent(data.data, _this.id, data.timestamp)) {
observable.forcePublish();
@ -1784,6 +1779,22 @@ export class nopeDispatcher implements INopeDispatcher {
if (_this.subscriptionExists(_pubTopic)) {
// Use the Communicator to emit the Event.
_this.communicator.emitEvent(_pubTopic, msg);
}
// Store the lastly published message, this will be published if
// a new subscription is provided
if (_this._eventsToSendCurrentValueOnSubscription.has(_pubTopic)) {
_this._lastPublishedEvent.set(_pubTopic, msg);
_externalSource.setContent({
data: data,
topic: _pubTopic,
// Watchout => We are using the provided sender here.
// It is used
sender: sender,
type: "event",
timestamp
});
} else if (_externalSource.observerLength > 0) {
// The Observable it is used multiple times internally.
// For this purpose, send the data using the "external source"
@ -1793,17 +1804,13 @@ export class nopeDispatcher implements INopeDispatcher {
_externalSource.setContent({
data: data,
topic: _pubTopic,
// Watchout => We are using the provided sender here.
// It is used
sender: sender,
type: "event",
timestamp
});
}
// Store the lastly published message, this will be published if
// a new subscription is provided
if (_this._eventsToSendCurrentValueOnSubscription.has(_pubTopic)) {
_this._lastPublishedEvent.set(_pubTopic, msg);
}
}
};

View File

@ -2,7 +2,7 @@
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2020-10-12 18:31:11
* @modify date 2020-12-03 18:36:09
* @modify date 2021-01-04 14:47:20
* @desc [description]
*/
@ -42,7 +42,7 @@ export type IValidPromise<T> = Promise<T> | INopePromise<T>;
export interface IDispatcherInfo {
id: string;
timestamp: number;
status: ENopeDispatcherStatus,
status: ENopeDispatcherStatus;
host: {
cores: number;
cpu: string;
@ -57,7 +57,7 @@ export enum ENopeDispatcherStatus {
HEALTHY = 0,
SLOW = 1,
WARNING = 2,
DEAD = 3
DEAD = 3
}
/**

View File

@ -2,7 +2,7 @@
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2020-12-31 12:47:20
* @modify date 2020-12-31 13:14:52
* @modify date 2021-01-04 09:06:24
* @desc [description]
*/

View File

@ -2,7 +2,7 @@
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2020-03-02 11:37:50
* @modify date 2021-01-03 09:28:02
* @modify date 2021-01-04 18:45:37
* @desc [description]
*/
@ -163,7 +163,7 @@ export class BeckhoffPlc extends BasePlcModule implements IPLCModule {
});
} else if (this._client === null) {
const _options = generateAdsOptions(options);
this._logger.warn(
this._logger.debug(
"Creating new ADS-Client " +
_this.identifier +
" with the following settings",

View File

@ -2,7 +2,7 @@
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2020-11-13 09:53:27
* @modify date 2020-12-30 13:35:02
* @modify date 2021-01-04 17:35:12
* @desc [description]
*/
@ -10,7 +10,10 @@ import * as Logger from "js-logger";
import { connect, MqttClient } from "mqtt";
import { Subscription } from "rxjs";
import { throttleTime } from "rxjs/operators";
import { exportProperty } from "../../../lib/decorators/moduleDecorators";
import {
exportMethod,
exportProperty
} from "../../../lib/decorators/moduleDecorators";
import { SPLITCHAR } from "../../../lib/helpers/objectMethods";
import { replaceAll } from "../../../lib/helpers/stringMethods";
import { getNopeLogger } from "../../../lib/logger/getLogger";
@ -105,7 +108,13 @@ export class MQTTClientModule
protected _logger = getNopeLogger("mqtt-client");
// Element Holding the Subscriptions
private _subscriptions = new Map<string, any>();
private _subscriptions = new Map<
string,
{
callback: () => Promise<void>;
format: "buffer" | "json";
}
>();
/**
* Test if a topic is blocked.
@ -544,6 +553,8 @@ export class MQTTClientModule
resolve(true);
}
_this._firstConnect = false;
_this.connected.setContent(true);
});
// Define a Handler, which is used
@ -621,6 +632,8 @@ export class MQTTClientModule
_this._client.on("offline", () => {
/** Try to Reconnect Client */
_this._logger.warn("MQTT - went Offline");
// Remove the connected Flag.
_this.connected.setContent(false);
});
_this._client.on("error", (_error) => {
@ -636,4 +649,96 @@ export class MQTTClientModule
}
await super.dispose();
}
@exportMethod({
paramsHasNoCallback: true
})
public subscribe(
topic: string,
callback,
options: {
qos: 0 | 1 | 2;
format: "buffer" | "json";
}
): Promise<void> {
const _this = this;
return new Promise<void>((resolve, reject) => {
try {
if (_this._subscriptions.has(topic)) {
reject(new Error("Topic already subscribed"));
} else {
_this._subscriptions.set(topic, {
callback: callback,
format: options.format
});
_this._client.subscribe(
topic,
{
qos: options.qos
},
(err) => {
if (err) reject(err);
else resolve();
}
);
}
} catch (e) {
reject(e);
}
});
}
@exportMethod({
paramsHasNoCallback: true
})
public async unsubscribe(topic: string): Promise<void> {
if (this._client) {
this._client.unsubscribe(topic);
}
}
@exportMethod({
paramsHasNoCallback: true
})
public async publish(
topic: string,
data: Buffer | any,
options: {
retain?: boolean;
qos?: 0 | 1 | 2;
format: "buffer" | "json";
}
): Promise<void> {
const _this = this;
return new Promise<void>((resolve, reject) => {
try {
let _message: Buffer | string = "";
switch (options.format) {
case "buffer":
_message = Buffer.from(JSON.parse(data));
break;
case "json":
_message = data;
break;
}
_this._client.publish(
topic,
_message,
{
retain: options.retain,
qos: options.qos
},
(err) => {
if (err) reject(err);
else resolve();
}
);
} catch (e) {
reject(e);
}
});
}
}

View File

@ -2,7 +2,7 @@
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2020-12-29 15:42:27
* @modify date 2020-12-29 15:42:31
* @modify date 2021-01-04 14:10:14
* @desc [description]
*/
@ -95,6 +95,16 @@ export interface IMqttSettings {
* @memberof IMqttSettings
*/
subscribeHidden?: boolean;
/**
* If an custom subscription is provided,
* this flag, decides, whether to forward
* the topic as well or not.
*
* @type {boolean}
* @memberof IMqttSettings
*/
exclusiveOnCustomTopics?: boolean;
}
export interface IMQTTClientModule {
uri: INopeObservable<string>;
@ -116,4 +126,25 @@ export interface IMQTTClientModule {
* @memberof IMQTTClientModule
*/
settings: INopeObservable<IMqttSettings>;
publish(
topic: string,
data: Buffer | any,
options: {
retain?: boolean;
qos?: 0 | 1 | 2;
format: "buffer" | "json";
}
): Promise<void>;
subscribe(
topic: string,
callback,
options: {
qos?: 0 | 1 | 2;
format: "buffer" | "json";
}
): Promise<void>;
unsubscribe(topic: string): Promise<void>;
}

View File

@ -0,0 +1,3 @@
export const FLUSH = "TransitionActionFLUSH";
export const STOP_FLUSH = "TransitionActionSTOP_FLUSH";
export const LOAD_CARRIERT = "TransitionActionWTLaden";

View File

@ -2,13 +2,14 @@
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2020-03-02 11:37:50
* @modify date 2020-12-30 16:50:52
* @modify date 2021-01-04 18:59:33
* @desc [description]
*/
/* eslint-disable no-fallthrough */
import { injectable } from "inversify";
import { promisify } from "util";
import {
exportMethod,
exportProperty
@ -21,23 +22,38 @@ import { INopeObserver } from "../../../lib/types/nope/nopeObservable.interface"
import { IPLCModule } from "../../generic-plc/type/interfaces";
import { generateAdsOptions } from "../../mod-Beckhoff-PLC-Interface/helpers/gen.options";
import { IBeckhoffOptions } from "../../mod-Beckhoff-PLC-Interface/type/interfaces";
import { IMQTTClientModule } from "../../mqtt-connector/types/interfaces";
import {
IMQTTClientModule,
IMqttSettings
} from "../../mqtt-connector/types/interfaces";
import {
IXeticsInterfaceClient,
MESTask
} from "../../xetics-lean-connector/src/xetics.module";
import {
IBaseModuleConnectorReplica,
FLUSH,
LOAD_CARRIERT as LOAD_CARRIER,
STOP_FLUSH
} from "../defaults/commands";
import {
IBaseModuleInformation,
IBaseModuleProcessInformation,
IBaseModuleProductInformation,
IConnectorReplicaBaseModuleToProcessModule,
IWaMOBaseModule,
IWaMOBaseModuleDescription
IWaMOBaseModuleDescription,
IWaMOProcessModule,
IWaMOProcessModuleDescription
} from "../types/interfaces";
export interface PLC extends IGenericNopeModule, IPLCModule {}
export interface MQTT extends IGenericNopeModule, IMQTTClientModule {}
export interface XETICS extends IGenericNopeModule, IXeticsInterfaceClient {}
export interface IPROCESSMODULE
extends IGenericNopeModule,
IWaMOProcessModule {}
const sleep = promisify(setTimeout);
@injectable()
export class WaMOBaseModule
@ -73,10 +89,10 @@ export class WaMOBaseModule
@exportProperty({
mode: "publish",
topic: "connectorReplica",
topic: "connector",
schema: {}
})
public connectorReplica = new NopeObservable<IBaseModuleConnectorReplica>();
public connector = new NopeObservable<IConnectorReplicaBaseModuleToProcessModule>();
/**
* Flag, to show whether the system is connected or not.
@ -95,6 +111,7 @@ export class WaMOBaseModule
protected _mqtt: MQTT;
protected _plc: PLC;
protected _xetics: XETICS;
protected _processModule: IPROCESSMODULE;
protected _observers: INopeObserver[] = [];
/**
@ -143,7 +160,8 @@ export class WaMOBaseModule
checkedInWorkpieceCarrier: string;
tasks: MESTask[];
}) => Promise<string | false>,
waitForInitialized = true
waitForInitialized = true,
autoStart = true
): Promise<void> {
// Define the Author.
this.author = {
@ -186,6 +204,32 @@ export class WaMOBaseModule
type: "XeticsInterfaceClient"
});
// create a MQTT-Connector:
const broker = `mqtt://${options.mqttUri}:1883`;
const mqttParams = [
broker,
{
autoPublish: {
active: false
},
autoSubscribe: false
} as IMqttSettings,
// Waiting for the Connection.
true
];
// create a MQTT-Connector:
this._mqtt = await _this._dispatcher.generateInstance<MQTT>({
identifier: "MQTT_CLIENT_" + options.mqttUri,
params: mqttParams,
type: "MQTTClientModule"
});
// Subscribe to the connection status:
this._mqtt.connected.subscribe(() => {
_this.connected.forcePublish();
});
// const sub = options.host + SPLITCHAR + "fired";
// const prop = await _this._dispatcher.subscribeToEvent(
// sub,
@ -241,6 +285,11 @@ export class WaMOBaseModule
"Release the Product"
);
// Wait 10 Seconds
await sleep(10000);
_this._logger.warn("waited => release");
await _this.releaseCarrier();
}
} else {
@ -274,7 +323,10 @@ export class WaMOBaseModule
_this.baseInformation.forcePublish();
// Log the Change
_this._logger.warn(`Updating property ${propName} to`, value);
_this._logger.debug(
`Updating property ${propName} to`,
value
);
});
}
@ -293,31 +345,34 @@ export class WaMOBaseModule
}
});
// create a MQTT-Connector:
const broker = `mqtt://${options.mqttUri}:1883`;
const mqttParams = [
broker,
const processModuleParams = [
{
autoPublish: true,
autoSubscribe: true
},
// Prevent waiting for the Connection.
false
beckhoff: {
instance: this._plc.identifier,
options: beckhoffParams
},
capability: "unkown",
id: "unkown",
mqtt: {
instance: this._mqtt.identifier,
settings: mqttParams
},
name: "Unkown",
mqttUri: options.mqttUri,
type: "process"
} as IWaMOProcessModuleDescription
];
// create a MQTT-Connector:
const mqtt = await _this._dispatcher.generateInstance<MQTT>({
identifier: "MQTT_CLIENT_" + options.mqttUri,
params: mqttParams,
type: "MQTTClientModule"
});
this._processModule = await _this._dispatcher.generateInstance<IPROCESSMODULE>(
{
identifier: _this.identifier + "_BECKHOFF_PROCESSMODULE_CLIENT",
params: processModuleParams,
type: "WaMOProcessModule"
}
);
// Subscribe to the connection status:
mqtt.connected.subscribe(() => {
_this.connected.forcePublish();
});
this._plc.initialized.forcePublish();
// Not requried
// this._plc.initialized.forcePublish();
} catch (e) {
_this._logger.error("Failed to create a Base-Module.");
_this._logger.error(e);
@ -325,6 +380,54 @@ export class WaMOBaseModule
}
});
this.connector.setContent({
start: false,
automatic: false,
release: false
});
this.connector.subscribe((value) => {
// Only proceed if the mqtt and plc client are available.
if (_this._mqtt && _this._plc) {
const dict = {
start: "bOutputBasisAnProzessmodulProzessStart",
release: "bOutputBasisAnProzessmodulProzessStart",
automatic: "bOutputBasisAnProzessmodulProzessStart"
};
_this._mqtt
.publish(
`${this.config.getContent().host}/baseModule/connectorReplica`,
JSON.stringify(value),
{
format: "json"
}
)
.catch((error) => _this._logger.error(error));
// Define every Element initially.
for (const key in dict) {
// Set the PLC.
_this._plc.dynamicInstanceProperties[
"input.GVL_IOS." + dict[key]
].setContent(value[key]);
// Publish on MQTT.
_this._mqtt
.publish(
`${
this.config.getContent().host
}/baseModule/connectorReplica/${key}`,
JSON.stringify(value[key]),
{
format: "json"
}
)
.catch((error) => _this._logger.error(error));
}
}
});
await super.init();
// Assign the Default Values:
@ -352,30 +455,136 @@ export class WaMOBaseModule
requiredParts: [],
typeId: ""
});
this.connectorReplica.setContent({
start: false
});
// Assign the Config => to update the Values.
this.config.setContent(options);
// If required, wait for the Job.
if (waitForInitialized) {
if (waitForInitialized || autoStart) {
await this.connected.waitFor((value) => value === true);
_this._logger.warn("Resetting PLC-Logic");
this.resetPLC()
.then(async () => {
try {
if (autoStart) {
const carrierID = _this._plc.dynamicInstanceProperties[
"input.GVL_IOS.bInputRFIDTagPaletteUUID"
].getContent();
if (carrierID === "") {
_this._logger.warn("Autostarting Task.");
await this.releaseCarrier();
_this._logger.warn("Released Carrier");
} else {
_this._logger.warn("Found Carrier with ID \"" + carrierID + "\"");
}
}
} catch (e) {
_this._logger.error("Something went wrong during autostart.");
_this._logger.error(e);
}
})
.catch((e) => {
_this._logger.error("Something went wrong during resetting the PLC.");
_this._logger.error(e);
});
}
}
/**
* Release the Carrier.
*
* @return {*} {Promise<void>}
* @memberof WaMOBaseModule
*/
@exportMethod({
paramsHasNoCallback: true
})
public async releaseCarrier(): Promise<void> {
// Release the Carrier
await this._dispatcher.emit(
await this._mqtt.publish(
`${this.config.getContent().host}/release`,
"TransitionActionWTLaden"
JSON.stringify(LOAD_CARRIER),
{
format: "json",
qos: 1
}
);
}
/**
* Release the Carrier.
*
* @return {*} {Promise<void>}
* @memberof WaMOBaseModule
*/
@exportMethod({
paramsHasNoCallback: true
})
public async resetPLC(): Promise<void> {
// Release the Carrier
await this._mqtt.publish(
`${this.config.getContent().host}/reset`,
// For resetting the PLC doesnt required a
"random_content",
{
format: "json",
qos: 1
}
);
// The PLC Usually Initializes for 2000 ms
// Make shure we are waiting log enough
await sleep(3000);
}
/**
* Flush all carriers
*
* @return {*} {Promise<void>}
* @memberof WaMOBaseModule
*/
@exportMethod({
paramsHasNoCallback: true
})
public async flush(): Promise<void> {
// Release the Carrier
await this._mqtt.publish(
`${this.config.getContent().host}/release`,
JSON.stringify(FLUSH),
{
format: "json",
qos: 1
}
);
}
/**
* Stop Flushing
*
* @return {*} {Promise<void>}
* @memberof WaMOBaseModule
*/
@exportMethod({
paramsHasNoCallback: true
})
public async stopFlush(): Promise<void> {
// Release the Carrier
await this._mqtt.publish(
`${this.config.getContent().host}/release`,
JSON.stringify(STOP_FLUSH),
{
format: "json",
qos: 1
}
);
}
/**
* Perform the current Task.
*
* @return {*} {Promise<void>}
* @memberof WaMOBaseModule
*/
@exportMethod({
paramsHasNoCallback: true
})
@ -398,12 +607,8 @@ export class WaMOBaseModule
const promiseOfTask = new Promise((resolve, reject) => {
const finishTask = async () => {
// Reset the Ports:
_this._plc.dynamicInstanceProperties[
"input.GVL_IOS." + "bOutputBasisAnProzessmodulProzessStart"
].setContent(false);
_this.connectorReplica.setContent({
start: false
});
_this.connector.getContent().start = false;
_this.connector.forcePublish();
// Mark the Task as Finished inside of the MES
const finished = await _this._xetics.finishCurrentTask();
@ -448,23 +653,16 @@ export class WaMOBaseModule
observers = [];
};
// Todo Check if the Replica has to be adapted
observers = [
// Subscribe to the physical Port
this._plc.dynamicInstanceProperties[
"input.GVL_IOS." + "bInputProzessmodulAnBasisAktiv"
].once((value) => {
// Extract the Result of the Operation.
const result = _this._plc.dynamicInstanceProperties[
"input.GVL_IOS." + "bInputProzessmodulAnBasisErgebnisIO"
].getContent();
// Clear the Observers
clearObservers();
finishTask().then(resolve).catch(reject);
// Subscribe to the connectorReplica of the Process Module.
_this._processModule.connector.subscribe((value) => {
if (value.active == false) {
// Unsubscribe the Observer
clearObservers();
finishTask().then(resolve).catch(reject);
}
})
// Subscribe to the logic Port (MQTT)
// this._dispatcher.subscribeToEvent(`${this.config.getContent().host}/release`)
];
});
@ -477,12 +675,8 @@ export class WaMOBaseModule
this.baseInformation.forcePublish();
// Enable the Port Replicas.
this._plc.dynamicInstanceProperties[
"input.GVL_IOS." + "bOutputBasisAnProzessmodulProzessStart"
].setContent(true);
this.connectorReplica.setContent({
start: true
});
_this.connector.getContent().start = true;
_this.connector.forcePublish();
// Wait for the Task to Finish
await promiseOfTask;

View File

@ -2,7 +2,7 @@
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2020-03-02 11:37:50
* @modify date 2020-12-30 13:25:38
* @modify date 2021-01-04 16:38:20
* @desc [description]
*/
@ -21,8 +21,12 @@ import { INopeObserver } from "../../../lib/types/nope/nopeObservable.interface"
import { IPLCModule } from "../../generic-plc/type/interfaces";
import { generateAdsOptions } from "../../mod-Beckhoff-PLC-Interface/helpers/gen.options";
import { IBeckhoffOptions } from "../../mod-Beckhoff-PLC-Interface/type/interfaces";
import { IMQTTClientModule } from "../../mqtt-connector/types/interfaces";
import {
IMQTTClientModule,
IMqttSettings
} from "../../mqtt-connector/types/interfaces";
import { IXeticsInterfaceClient } from "../../xetics-lean-connector/src/xetics.module";
import { FLUSH, STOP_FLUSH } from "../defaults/commands";
import { IWaMOConverterModuleDescription } from "../types/interfaces";
export interface PLC extends IGenericNopeModule, IPLCModule {}
@ -188,9 +192,11 @@ export class WaMOConverterModule extends InjectableNopeBaseModule {
const mqttParams = [
broker,
{
autoPublish: true,
autoSubscribe: true
},
autoPublish: {
active: false
},
autoSubscribe: false
} as IMqttSettings,
// Prevent waiting for the Connection.
false
];
@ -229,9 +235,13 @@ export class WaMOConverterModule extends InjectableNopeBaseModule {
})
public async carrierFromAtoB(): Promise<void> {
// Release the Carrier
await this._dispatcher.emit(
await this._mqtt.publish(
`${this.config.getContent().host}/release`,
"TransitionActionWTLaden"
JSON.stringify("TransitionActionWTLaden"),
{
format: "json",
qos: 1
}
);
}
@ -240,9 +250,13 @@ export class WaMOConverterModule extends InjectableNopeBaseModule {
})
public async carrierFromAtoD(): Promise<void> {
// Release the Carrier
await this._dispatcher.emit(
await this._mqtt.publish(
`${this.config.getContent().host}/release`,
"TransitionActionWTLaden"
JSON.stringify("TransitionActionWTLaden"),
{
format: "json",
qos: 1
}
);
}
@ -251,9 +265,13 @@ export class WaMOConverterModule extends InjectableNopeBaseModule {
})
public async carrierFromCtoB(): Promise<void> {
// Release the Carrier
await this._dispatcher.emit(
await this._mqtt.publish(
`${this.config.getContent().host}/release`,
"TransitionActionWTLaden"
JSON.stringify("TransitionActionWTLaden"),
{
format: "json",
qos: 1
}
);
}
@ -262,9 +280,82 @@ export class WaMOConverterModule extends InjectableNopeBaseModule {
})
public async carrierFromCtoD(): Promise<void> {
// Release the Carrier
await this._dispatcher.emit(
await this._mqtt.publish(
`${this.config.getContent().host}/release`,
"TransitionActionWTLaden"
JSON.stringify("TransitionActionWTLaden"),
{
format: "json",
qos: 1
}
);
}
/**
* Flush all carriers
*
* @return {*} {Promise<void>}
* @memberof WaMOConverterModule
*/
@exportMethod({
paramsHasNoCallback: true
})
public async flush(lowerStoppers = false): Promise<void> {
// Release the Carrier
await this._mqtt.publish(
`${this.config.getContent().host}/release`,
JSON.stringify(FLUSH),
{
format: "json",
qos: 1
}
);
if (lowerStoppers) {
// Iterate over the Elements that has to be lowered.
const ios = [
"bOutputStopperVorWeicheAusfahren",
"bOutputWeicheVorneEinfahren",
"bOutputWeicheHintenEinfahren"
];
for (const io of ios) {
this._plc.dynamicInstanceProperties["input.GVL_IOS." + io].setContent(
true
);
}
}
}
/**
* Stop Flushing
*
* @return {*} {Promise<void>}
* @memberof WaMOConverterModule
*/
@exportMethod({
paramsHasNoCallback: true
})
public async stopFlush(): Promise<void> {
const ios = [
"bOutputStopperVorWeicheAusfahren",
"bOutputWeicheVorneEinfahren",
"bOutputWeicheHintenEinfahren"
];
for (const io of ios) {
this._plc.dynamicInstanceProperties["input.GVL_IOS." + io].setContent(
false
);
}
// Release the Carrier
await this._mqtt.publish(
`${this.config.getContent().host}/release`,
JSON.stringify(STOP_FLUSH),
{
format: "json",
qos: 1
}
);
}
}

View File

@ -2,7 +2,7 @@
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2020-03-02 11:37:50
* @modify date 2020-12-30 13:33:50
* @modify date 2021-01-04 14:48:00
* @desc [description]
*/

View File

@ -2,7 +2,7 @@
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2020-12-30 08:50:18
* @modify date 2020-12-30 12:48:51
* @modify date 2021-01-04 16:17:30
* @desc [description]
*/
@ -13,13 +13,15 @@ import { WaMOCarrierMapper } from "./wamo.carrierMapper.module";
import { WaMOConverterModule } from "./wamo.converter.module";
import { WaMOLineConnector } from "./wamo.lineConnector.module";
import { WaMOLineManager } from "./wamo.lineManager.module";
import { WaMOProcessModule } from "./wamo.processmodule.module";
const TYPES = {
wamoLineConnector: Symbol.for("wamoLineConnector"),
wamoLineManager: Symbol.for("WaMOLineManager"),
wamoBaseModule: Symbol.for("wamoBaseModule"),
wamoConverter: Symbol.for("wamoConverter"),
wamoCarrierMapper: Symbol.for("wamoCarrierMapper")
wamoCarrierMapper: Symbol.for("wamoCarrierMapper"),
WaMOProcessModule: Symbol.for("WaMOProcessModule")
};
export const DESCRIPTION: IPackageDescription<typeof TYPES> = {
@ -70,6 +72,17 @@ export const DESCRIPTION: IPackageDescription<typeof TYPES> = {
allowInstanceGeneration: true
}
},
// ProcessModule
{
description: {
name: WaMOProcessModule.prototype.constructor.name.toString(),
selector: TYPES.WaMOProcessModule,
type: WaMOProcessModule
},
settings: {
allowInstanceGeneration: true
}
},
// Converter
{
description: {

View File

@ -0,0 +1,281 @@
/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2020-03-02 11:37:50
* @modify date 2021-01-04 17:04:03
* @desc [description]
*/
import { injectable } from "inversify";
import {
exportMethod,
exportProperty
} from "../../../lib/decorators/moduleDecorators";
import { getNopeLogger } from "../../../lib/logger/getLogger";
import { InjectableNopeBaseModule } from "../../../lib/module/BaseModule.injectable";
import { NopeObservable } from "../../../lib/observables/nopeObservable";
import { IGenericNopeModule } from "../../../lib/types/nope/nopeModule.interface";
import { INopeObserver } from "../../../lib/types/nope/nopeObservable.interface";
import { IPLCModule } from "../../generic-plc/type/interfaces";
import { IMQTTClientModule } from "../../mqtt-connector/types/interfaces";
import {
IConnectorReplicaProcessModuleToBaseModule,
IProcessModuleBaseInformation,
IProcessModuleProcessResult,
IWaMOProcessModule,
IWaMOProcessModuleDescription
} from "../types/interfaces";
export interface PLC extends IGenericNopeModule, IPLCModule {}
export interface MQTT extends IGenericNopeModule, IMQTTClientModule {}
@injectable()
export class WaMOProcessModule
extends InjectableNopeBaseModule
implements IWaMOProcessModule {
@exportProperty({
mode: "publish",
topic: "config",
schema: {}
})
public config = new NopeObservable<IWaMOProcessModuleDescription>();
@exportProperty({
mode: "publish",
topic: "baseInformation",
schema: {}
})
public baseInformation = new NopeObservable<IProcessModuleBaseInformation>();
@exportProperty({
mode: "publish",
topic: "processResult",
schema: {}
})
public processResult = new NopeObservable<IProcessModuleProcessResult>();
@exportProperty({
mode: "publish",
topic: "connector",
schema: {}
})
public connector = new NopeObservable<IConnectorReplicaProcessModuleToBaseModule>();
/**
* Flag, to show whether the system is connected or not.
*
* @memberof WaMOBaseModule
*/
@exportProperty({
mode: "publish",
topic: "connected",
schema: {
type: "boolean"
}
})
public connected = new NopeObservable<boolean>();
protected _mqtt: MQTT;
protected _plc: PLC;
protected _observers: INopeObserver[] = [];
/**
* Helper Function to reset all connections.
*/
@exportMethod({
paramsHasNoCallback: true
})
public async reset(): Promise<void> {
if (this._mqtt) {
await this._mqtt.dispose();
}
if (this._plc) {
await this._plc.dispose();
}
for (const obs of this._observers) {
obs.unsubscribe();
}
this._observers = [];
}
/**
* Dispose Function, which will shutdown everything correctly.
* All Subscription will be unsubscribed, the Client will be closed
* etc...
*/
public async dispose(): Promise<void> {
await this.reset();
await super.dispose();
}
/**
* The Logger of the Module.
*
* @private
* @memberof BeckhoffPlc
*/
private _logger = getNopeLogger("wamo-base-module");
public async init(
options: IWaMOProcessModuleDescription,
waitForInitialized = true
): Promise<void> {
// Define the Author.
this.author = {
forename: "Martin",
mail: "m.karkowski@zema.de",
surename: "karkowski"
};
this.description =
"Interface for a WaMO-Process-Module. It forwards all information of the Connector to";
this.version = {
date: new Date("30.12.2020"),
version: 1
};
const _this = this;
this._logger = getNopeLogger("wamo-process-module-" + this.identifier);
// Define a Getter => which will
// extract all values.
this.connected.getter = (value) => {
return (
this._mqtt?.connected.getContent() &&
this._plc?.initialized.getContent()
);
};
// Define a Beavior if the ads Options have been setted.
this.config.subscribe(async (options) => {
try {
// Reset the System
await _this.reset();
// create a MQTT-Connector:
const broker = `mqtt://${options.mqttUri}:1883`;
const mqttParams = [
broker,
options.mqtt.settings,
// Waiting for the Connection.
true
];
// create a MQTT-Connector:
this._mqtt = await _this._dispatcher.generateInstance<MQTT>({
identifier: options.mqtt.instance,
params: mqttParams,
type: "MQTTClientModule"
});
// Subscribe to the connection status:
this._mqtt.connected.subscribe(() => {
_this.connected.forcePublish();
});
// create a PLC Connector:
this._plc = await _this._dispatcher.generateInstance<PLC>({
identifier: options.beckhoff.instance,
params: options.beckhoff.options,
type: "BeckhoffPlc"
});
// Subscribe to the Initialized PLC.
this._plc.initialized.subscribe((rdy) => {
// Only if the PLC is connected => Perform an update
// and adapt the Information.
if (rdy) {
try {
const dict = {
active: "bInputProzessmodulAnBasisAktiv",
result: "bInputProzessmodulAnBasisErgebnisIO",
error: "bInputProzessmodulAnBasisFehler"
};
// Publish the whole Connector:
_this._mqtt
.subscribe(
"+/processModule/connectorReplica",
(value) => {
_this.connector.setContent(value);
},
{
format: "json"
}
)
.catch((error) => _this._logger.error(error));
// Define every Element initially.
for (const key in dict) {
// Set the PLC.
_this._plc.dynamicInstanceProperties[
"input.GVL_IOS." + dict[key]
].subscribe((value) => {
_this.connector.getContent()[key] = value;
_this.connector.forcePublish();
});
// Publish on MQTT.
_this._mqtt
.subscribe(
`+/processModule/connectorReplica/${key}`,
(value) => {
_this.connector.getContent()[key] = value;
_this.connector.forcePublish();
},
{
format: "json"
}
)
.catch((error) => _this._logger.error(error));
}
_this.connected.forcePublish();
} catch (e) {
_this._logger.error("Failed to subscribe to the PLC");
_this._logger.error(e);
}
}
});
// Not requried
// this._plc.initialized.forcePublish();
} catch (e) {
_this._logger.error("Failed to create a Base-Module.");
_this._logger.error(e);
throw e;
}
});
await super.init();
// Assign the Default Values:
this.baseInformation.setContent({
logical: {
currentState: "initialize",
usesIosOfConnector: true,
error: null
},
physical: {
rfidTagId: options.id
}
});
this.connector.subscribe((value) => {
_this._logger.info("changed replica-port to", value);
});
this.connector.setContent({
active: false,
error: false,
result: false
});
// Assign the Config => to update the Values.
this.config.setContent(options);
// If required, wait for the Job.
if (waitForInitialized) {
await this.connected.waitFor((value) => value === true);
}
}
}

View File

@ -2,12 +2,13 @@
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2020-12-29 15:02:56
* @modify date 2020-12-30 12:38:32
* @modify date 2021-01-04 16:44:16
* @desc [description]
*/
import { INopeObservable } from "../../../lib/types/nope/nopeObservable.interface";
import { IBeckhoffOptions } from "../../mod-Beckhoff-PLC-Interface/type/interfaces";
import { IMqttSettings } from "../../mqtt-connector/types/interfaces";
export interface IAbstractWaMOModuleDescription {
type: "process" | "feeder" | "base" | "converter";
@ -16,9 +17,18 @@ export interface IAbstractWaMOModuleDescription {
mqttUri: string;
}
export interface IWaMOProcessModuleDescription {
export interface IWaMOProcessModuleDescription
extends IAbstractWaMOModuleDescription {
capability: string;
type: "process";
beckhoff: {
instance: string;
options: IBeckhoffOptions;
};
mqtt: {
settings: IMqttSettings;
instance: string;
};
}
export interface IWaMOFeedingModuleDescription
@ -83,6 +93,37 @@ export interface IBaseModuleInformation {
};
}
export interface IProcessModuleBaseInformation {
physical: {
rfidTagId: string; // RFID-Tag des Prozess-Moduls
};
logical: {
// Fehler-Code der Anlage -> Zuordnung zu einer Fehler-Datenbank,
error?: {
code?: number;
description: string;
};
currentState:
| "initialize"
| "active"
| "waiting"
| "processing"
| "error"
| "standby";
usesIosOfConnector: boolean; // Toggle, der angibt ob die IOS des Steckverbinders genutzt werden
};
planner?: {
description: any; // Verhaltensmodell (für den Planner / die Analyse)
currentState: any; // Aktueller System Zustand (Tokens des Petri-Nets)
};
}
export interface IProcessModuleProcessResult {
start: string;
end: string;
parameters: any[];
}
export interface IBaseModuleProductInformation {
id: string; // ID des Produktes
batch: string; // Batch-Nummer
@ -96,17 +137,31 @@ export interface IBaseModuleProcessInformation {
requiredParts: Array<string>; // Benötigte Teile für den Prozess
}
export interface IBaseModuleConnectorReplica {
export interface IConnectorReplicaBaseModuleToProcessModule {
start: boolean;
release: boolean;
automatic: boolean;
}
export interface IConnectorReplicaProcessModuleToBaseModule {
active: boolean;
result: boolean;
error: boolean;
}
export interface IWaMOBaseModule {
connected: INopeObservable<boolean>;
config: INopeObservable<IWaMOBaseModuleDescription>;
baseInformation: INopeObservable<IBaseModuleInformation>;
productInformation: INopeObservable<IBaseModuleProductInformation>;
processInformation: INopeObservable<IBaseModuleProcessInformation>;
connectorReplica: INopeObservable<IBaseModuleConnectorReplica>;
connector: INopeObservable<IConnectorReplicaBaseModuleToProcessModule>;
}
export interface IWaMOProcessModule {
connected: INopeObservable<boolean>;
config: INopeObservable<IWaMOProcessModuleDescription>;
baseInformation: INopeObservable<IProcessModuleBaseInformation>;
connector: INopeObservable<IConnectorReplicaProcessModuleToBaseModule>;
}
export interface IWaMOCarrierMapper {