unetmsg: add unet pub/sub message broker based on ubus
authorFelix Fietkau <nbd@nbd.name>
Fri, 7 Mar 2025 17:20:23 +0000 (18:20 +0100)
committerFelix Fietkau <nbd@nbd.name>
Sun, 9 Mar 2025 15:44:52 +0000 (16:44 +0100)
This service automatically establishes connections to any hosts that are members
of the same unet network, and allows publish/subscribe exchanges via ubus channels.

Signed-off-by: Felix Fietkau <nbd@nbd.name>
package/network/services/unetmsg/Makefile [new file with mode: 0644]
package/network/services/unetmsg/files/etc/init.d/unetmsg [new file with mode: 0755]
package/network/services/unetmsg/files/usr/sbin/unetmsgd [new file with mode: 0755]
package/network/services/unetmsg/files/usr/share/ucode/unetmsg/client.uc [new file with mode: 0644]
package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-client.uc [new file with mode: 0644]
package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-remote.uc [new file with mode: 0644]
package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd.uc [new file with mode: 0644]
package/network/services/unetmsg/files/usr/share/ucode/unetmsg/utils.uc [new file with mode: 0644]

diff --git a/package/network/services/unetmsg/Makefile b/package/network/services/unetmsg/Makefile
new file mode 100644 (file)
index 0000000..eff2089
--- /dev/null
@@ -0,0 +1,35 @@
+#
+# Copyright (C) 2025 OpenWrt.org
+#
+# This is free software, licensed under the GNU General Public License v2.
+# See /LICENSE for more information.
+#
+
+include $(TOPDIR)/rules.mk
+
+PKG_NAME:=unetmsg
+PKG_RELEASE:=$(AUTORELEASE)
+
+PKG_LICENSE:=GPL-2.0
+PKG_MAINTAINER:=Felix Fietkau <nbd@nbd.name>
+
+include $(INCLUDE_DIR)/package.mk
+
+define Package/unetmsg
+  SECTION:=utils
+  CATEGORY:=Utilities
+  TITLE:=unet network pub/sub message broker
+  DEPENDS:=+ucode +ucode-mod-socket \
+       +ucode-mod-ubus +ucode-mod-uloop \
+       +ucode-mod-fs
+endef
+
+define Build/Compile
+       :
+endef
+
+define Package/unetmsg/install
+       $(CP) ./files/* $(1)/
+endef
+
+$(eval $(call BuildPackage,unetmsg))
diff --git a/package/network/services/unetmsg/files/etc/init.d/unetmsg b/package/network/services/unetmsg/files/etc/init.d/unetmsg
new file mode 100755 (executable)
index 0000000..0960be3
--- /dev/null
@@ -0,0 +1,14 @@
+#!/bin/sh /etc/rc.common
+# Copyright (c) 2021 OpenWrt.org
+
+START=50
+
+USE_PROCD=1
+PROG=/usr/sbin/unetmsgd
+
+start_service() {
+       procd_open_instance
+       procd_set_param command "$PROG"
+       procd_set_param respawn
+       procd_close_instance
+}
diff --git a/package/network/services/unetmsg/files/usr/sbin/unetmsgd b/package/network/services/unetmsg/files/usr/sbin/unetmsgd
new file mode 100755 (executable)
index 0000000..aa21053
--- /dev/null
@@ -0,0 +1,68 @@
+#!/usr/bin/env ucode
+// SPDX-License-Identifier: GPL-2.0+
+/*
+ * Copyright (C) 2025 Felix Fietkau <nbd@nbd.name>
+ */
+'use strict';
+import * as libubus from "ubus";
+import * as uloop from "uloop";
+import * as unetmsg_core from "unetmsg.unetmsgd";
+
+uloop.init();
+let ubus = libubus.connect();
+if (!ubus) {
+       warn(`Failed to connect to ubus\n`);
+       exit(1);
+}
+
+let core = unetmsg_core.init(ubus, true);
+
+function update_acl() {
+       let data = ubus.call(libubus.SYSTEM_OBJECT_ACL, "query");
+       core.acl_set(data.acl);
+}
+
+let obj = ubus.publish("unetmsg", {
+       channel: {
+               args: {},
+               call: function(req) {
+                       if (!core.client.new(req))
+                               return libubus.STATUS_INVALID_ARGUMENT;
+
+                       return 0;
+               }
+       },
+       list: {
+               args: {
+                       name: "",
+               },
+               call: function(req) {
+                       let ret = [];
+                       for (let name in core.publish)
+                               if (req.args.name == null || wildcard(name, req.args.name))
+                                       push(ret, name);
+
+                       return {
+                               id: sort(ret),
+                       };
+               },
+       },
+       request: {
+               args: {
+                       name: "",
+                       type: "",
+                       data: {},
+               },
+               call: function(req) {
+                       try {
+                               core.handle_request(null, req, req.args, true);
+                       } catch (e) {
+                               core.exception(e);
+                       }
+               }
+       }
+});
+
+ubus.subscriber("ubus.acl.sequence", () => update_acl());
+update_acl();
+uloop.run();
diff --git a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/client.uc b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/client.uc
new file mode 100644 (file)
index 0000000..89c637a
--- /dev/null
@@ -0,0 +1,159 @@
+// SPDX-License-Identifier: GPL-2.0+
+/*
+ * Copyright (C) 2025 Felix Fietkau <nbd@nbd.name>
+ */
+'use strict';
+import * as libubus from "ubus";
+import * as uloop from "uloop";
+
+function publish(name, request_cb)
+{
+       if (!this.channel)
+               this.connect();
+
+       if (type(name) == "string")
+               name = [ name ];
+
+       for (let cur in name)
+               this.cb_pub[cur] = request_cb;
+
+       if (!this.channel)
+               return;
+
+       this.channel.request("publish", { name });
+}
+
+function subscribe(name, message_cb)
+{
+       if (!this.channel)
+               this.connect();
+
+       if (type(name) == "string")
+               name = [ name ];
+
+       for (let cur in name)
+               this.cb_sub[cur] = message_cb;
+
+       if (!this.channel)
+               return;
+
+       this.channel.request("subscribe", { name });
+}
+
+function send(name, type, data)
+{
+       this.channel.request({
+               method: "message",
+               no_return: true,
+               data: {
+                       name, type, data
+               },
+       });
+}
+
+function default_complete_cb()
+{
+}
+
+function request(name, type, data, data_cb, complete_cb)
+{
+       if (!this.channel)
+               this.connect();
+
+       if (!this.channel)
+               return;
+
+       let req = this.channel.defer({
+               method: "request",
+               data: {
+                       name, type, data
+               },
+               data_cb,
+               cb: complete_cb
+       });
+
+       if (!complete_cb)
+               req.await();
+}
+
+function connect()
+{
+       if (this.channel)
+               return;
+
+       let cl = this;
+       let res = cl.ubus.call({
+               object: "unetmsg",
+               method: "channel",
+               fd_cb: (fd) => {
+                       cl.channel = libubus.open_channel(fd, cl.request_cb, cl.disconnect_cb, cl.timeout);
+               }
+       });
+
+       if (!this.channel) {
+               this.connect_timer.set(1000);
+               return;
+       }
+
+       if (length(this.cb_pub) > 0)
+               this.channel.request("publish", {
+                       name: keys(this.cb_pub)
+               });
+
+       if (length(this.cb_sub) > 0)
+               this.channel.request("subscribe", {
+                       name: keys(this.cb_sub)
+               });
+}
+
+const client_proto = {
+       connect, publish, subscribe, send, request,
+       close: function() {
+               if (this.channel)
+                       this.channel.disconnect();
+               this.connect_timer.cancel();
+               for (let name in keys(this))
+                       delete this[name];
+       }
+};
+
+function handle_request(cl, req)
+{
+       let cb;
+
+       switch (req.type) {
+       case "message":
+               cb = cl.cb_sub[req.args.name];
+               if (cb)
+                       return cb(req);
+               break;
+       case "request":
+               cb = cl.cb_pub[req.args.name];
+               if (cb)
+                       return cb(req);
+       }
+       return 0;
+}
+
+export function open(ubus_conn, timeout)
+{
+       let cl = proto({
+               cb_sub: {},
+               cb_pub: {},
+               ubus: ubus_conn,
+               timeout,
+       }, client_proto);
+
+       cl.request_cb = (req) => {
+               return handle_request(cl, req);
+       };
+
+       cl.disconnect_cb = () => {
+               cl.channel = null;
+               cl.connect_timer.set(100);
+       };
+
+       cl.connect_timer = uloop.timer(1, () => cl.connect());
+
+       return cl;
+};
diff --git a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-client.uc b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-client.uc
new file mode 100644 (file)
index 0000000..cc971b5
--- /dev/null
@@ -0,0 +1,127 @@
+// SPDX-License-Identifier: GPL-2.0+
+/*
+ * Copyright (C) 2025 Felix Fietkau <nbd@nbd.name>
+ */
+'use strict';
+import * as libubus from "ubus";
+import { gen_id } from "./utils.uc";
+
+let core;
+let clients = {};
+
+const pubsub_proto = {
+       get_channel: function() {
+               let cl = clients[this.client];
+               if (!cl)
+                       return;
+
+               return cl.chan;
+       }
+};
+
+function client_pubsub(kind, cl, names)
+{
+       if (type(names) != "array")
+               return libubus.STATUS_INVALID_ARGUMENT;
+
+       for (let cur in names) {
+               if (type(cur) != "string")
+                       return libubus.STATUS_INVALID_ARGUMENT;
+       }
+
+       if (!core.acl_check(kind, cl.acl, names))
+               return libubus.STATUS_PERMISSION_DENIED;
+
+       let cl_list = cl[kind];
+       for (let name in names) {
+               if (cl_list[name])
+                       continue;
+
+               cl_list[name] = core.pubsub_add(kind, name, proto({
+                       client: cl.id,
+               }, pubsub_proto));
+       }
+
+       return 0;
+}
+
+function prepare_data(args)
+{
+       return {
+               name: args.name,
+               type: args.type,
+               data: args.data,
+       };
+}
+
+function client_request(cl, req)
+{
+       let args = req.args;
+       let name = args.name;
+
+       if (type(name) != "string" || type(args.type) != "string" || type(args.data) != "object")
+               return libubus.STATUS_INVALID_ARGUMENT;
+
+       let data = prepare_data(req.args);
+       let handle;
+       switch (req.type) {
+       case "message":
+               handle = cl.publish[name];
+           if (!handle)
+                       return libubus.STATUS_INVALID_ARGUMENT;
+               return core.handle_message(handle, data, true);
+       case "request":
+               handle = cl.subscribe[name];
+           if (!handle)
+                       return libubus.STATUS_INVALID_ARGUMENT;
+               return core.handle_request(handle, req, data, true);
+       }
+}
+
+function client_cb(cl, req)
+{
+       let args = req.args;
+       switch (req.type) {
+       case "publish":
+       case "subscribe":
+               return client_pubsub(req.type, cl, args.name);
+       case "message":
+       case "request":
+               return client_request(cl, req);
+       }
+}
+
+function client_disconnect(id)
+{
+       let cl = clients[id];
+       if (!cl)
+               return;
+
+       for (let kind in [ "publish", "subscribe" ])
+               for (let name, data in cl[kind])
+                       core.pubsub_del(kind, name, data);
+
+       delete clients[id];
+}
+
+export function new(req)
+{
+       let id = gen_id();
+       let acl = req.info.acl;
+       let client = {
+               id, acl,
+               publish: {},
+               subscribe: {},
+       };
+       let cb = (req) => client_cb(client, req);
+       let disconnect_cb = () => client_disconnect(id);
+       client.chan = req.new_channel(cb, disconnect_cb);
+       clients[id] = client;
+
+       return client;
+};
+
+export function set_core(_core)
+{
+       core = _core;
+};
diff --git a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-remote.uc b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-remote.uc
new file mode 100644 (file)
index 0000000..91829f7
--- /dev/null
@@ -0,0 +1,446 @@
+// SPDX-License-Identifier: GPL-2.0+
+/*
+ * Copyright (C) 2025 Felix Fietkau <nbd@nbd.name>
+ */
+'use strict';
+import * as libubus from "ubus";
+import * as uloop from "uloop";
+import * as socket from "socket";
+import { gen_id } from "./utils.uc";
+
+let core, ubus;
+let local_id = gen_id();
+let ev_listener, sub;
+
+let networks = {};
+
+const USYNC_PORT = 51818;
+
+const pubsub_proto = {
+       get_channel: function() {
+               let net = networks[this.network];
+               if (!net)
+                       return;
+
+               let sock_data = net.tx_channels[this.name];
+               if (!sock_data)
+                       return;
+
+               return sock_data.channel;
+       },
+       get_response_data: function(data) {
+               data.network = this.network,
+               data.host = this.name;
+               return data;
+       }
+};
+
+function network_socket_close(data)
+{
+       if (!data)
+               return;
+
+       if (data.timer)
+               data.timer.cancel();
+       data.channel.disconnect();
+       data.socket.close();
+}
+
+function network_socket_handle_request(sock_data, req)
+{
+       let net = networks[sock_data.network];
+       if (!net)
+               return;
+
+       let msgtype = req.type;
+       let host = sock_data.name;
+       let network = sock_data.network;
+       let args = { ...req.args, host, network };
+       switch (msgtype) {
+       case "publish":
+       case "subscribe":
+               let list = sock_data[msgtype];
+               let name = args.name;
+               if (!name)
+                       return;
+               if (args.enabled) {
+                       if (list[name])
+                               return 0;
+
+                       core["remote_" + msgtype][name] ??= {};
+                       core["remote_" + msgtype][name][host] = proto({
+                               network: sock_data.network,
+                               name: host,
+                       }, pubsub_proto);
+                       list[name] = true;
+               } else {
+                       if (!list[name])
+                               return 0;
+                       delete core["remote_" + msgtype][name][host];
+                       delete list[name];
+               }
+               break;
+       case "request":
+               return core.handle_request(null, req, args);
+       case "message":
+               core.handle_message(null, args);
+               return 0;
+       }
+
+       return 0;
+}
+
+function network_auth_token(net, host, id)
+{
+       let auth_data = ubus.call("unetd", "token_create", {
+               network: net,
+               target: host,
+               data: { id }
+       });
+
+       if (!auth_data)
+               return;
+
+       return auth_data.token;
+}
+
+function network_auth_valid(host, id, token)
+{
+       if (!token)
+               return;
+
+       let data = ubus.call("unetd", "token_parse", { token });
+       if (!data)
+               return;
+
+       if (data.host != host)
+               return;
+
+       if (data.user != "root")
+               return;
+
+       data = data.data;
+       if (data.id != id)
+               return;
+
+       return true;
+}
+
+
+function network_check_auth(sock_data, info)
+{
+       if (!network_auth_valid(sock_data.name, sock_data.id, info.token))
+               return;
+
+       let net = networks[sock_data.network];
+       if (!net)
+               return;
+
+       network_socket_close(net.rx_channels[sock_data.name]);
+       if (sock_data.timer)
+               sock_data.timer.cancel();
+       sock_data.auth = true;
+       net.rx_channels[sock_data.name] = sock_data;
+       core.dbg(`Incoming connection from ${sock_data.name} established\n`);
+       if (!net.tx_channels[sock_data.name])
+               net.timer.set(100);
+}
+
+function network_accept(net, sock, addr)
+{
+       let src = addr.address;
+       let name;
+
+       for (let cur_name, data in net.peers)
+               if (data.address == src)
+                       name = cur_name;
+
+       if (!name) {
+               core.dbg(`No peer found for address ${src}\n`);
+               sock.close();
+               return;
+       }
+
+       let sock_data = {
+               network: net.name,
+               socket: sock,
+               publish: {},
+               subscribe: {},
+               name,
+       };
+
+       let cb = (req) => {
+               if (!sock_data.auth) {
+                       if (req.type == "hello") {
+                               sock_data.id = req.args.id;
+                               return;
+                       }
+                       if (req.type == "auth")
+                               network_check_auth(sock_data, req.args);
+
+                       if (!sock_data.auth) {
+                               warn(`Auth failed\n`);
+                               network_socket_close(sock_data);
+                               return 0;
+                       }
+
+                       let token = network_auth_token(net.name, name, req.args.id);
+                       if (!token) {
+                               warn(`Failed to generate auth reply token\n`);
+                               return 0;
+                       }
+
+                       req.reply({ token }, true);
+
+                       return 0;
+               }
+
+               return network_socket_handle_request(sock_data, req);
+       };
+
+       let disconnect_cb = (req) => {
+               let net = networks[sock_data.network];
+               let cur_data = net.rx_channels[sock_data.name];
+               if (cur_data == sock_data)
+                       delete net.rx_channels[sock_data.name];
+
+               network_socket_close(sock_data);
+       };
+
+       sock_data.id = gen_id();
+       sock_data.timer = uloop.timer(10 * 1000, () => {
+               network_socket_close(sock_data);
+       });
+       sock_data.channel = libubus.open_channel(sock, cb, disconnect_cb);
+       sock_data.channel.request({
+               method: "hello",
+               data: { id: sock_data.id },
+               no_return: true,
+       });
+}
+
+function network_tx_socket_close(data)
+{
+       if (!data)
+               return;
+
+       core.dbg(`Outgoing connection to ${data.name} closed\n`);
+       let net = networks[data.network];
+       if (net && net.tx_channels[data.name] == data)
+               delete net.tx_channels[data.name];
+
+       network_socket_close(data);
+}
+
+function network_open_channel(net, name, peer)
+{
+       network_tx_socket_close(net.tx_channels[name]);
+
+       let sock_data = {
+               network: net.name,
+               name
+       };
+
+       let addr = socket.sockaddr({
+               address: peer.address,
+               port: USYNC_PORT
+       });
+       if (!addr)
+               return;
+
+       let sock = socket.create(socket.AF_INET6, socket.SOCK_STREAM | socket.SOCK_NONBLOCK);
+       if (!sock)
+               return;
+
+       core.dbg(`Try to connect to ${name}\n`);
+       sock.connect(addr);
+       let auth_data_cb = (msg) => {
+               if (!network_auth_valid(sock_data.name, sock_data.id, msg.token))
+                       return;
+
+               sock_data.auth = true;
+               core.dbg(`Outgoing connection to ${name} established\n`);
+
+               for (let kind in [ "publish", "subscribe" ])
+                       for (let name in core[kind])
+                               sock_data.channel.request({
+                                       method: kind,
+                                       data: { name, enabled: true },
+                                       no_return: true,
+                               });
+       };
+       let auth_cb = () => {
+               if (!sock_data.auth)
+                       network_tx_socket_close(sock_data);
+       };
+
+       let cb = (req) => {
+               if (sock_data.auth)
+                       return 0;
+
+               if (req.type != "hello") {
+                       network_tx_socket_close(sock_data);
+                       return 0;
+               }
+
+               let token = network_auth_token(net.name, name, req.args.id);
+               if (!token) {
+                       network_tx_socket_close(sock_data);
+                       return 0;
+               }
+
+               sock_data.request = sock_data.channel.defer({
+                       method: "auth",
+                       data: { token },
+                       data_cb: auth_data_cb,
+                       cb: auth_cb,
+               });
+
+               return 0;
+       };
+
+       let disconnect_cb = (req) => {
+               let net = networks[sock_data.network];
+               let cur_data = net.tx_channels[sock_data.name];
+               if (cur_data == sock_data)
+                       delete net.rx_channels[sock_data.name];
+
+               network_tx_socket_close(sock_data);
+       };
+
+       sock_data.socket = sock;
+       sock_data.channel = libubus.open_channel(sock, cb, disconnect_cb);
+       net.tx_channels[name] = sock_data;
+}
+
+function network_connect_peers(net)
+{
+       let n_pending = 0;
+
+       for (let name, data in net.peers) {
+               let chan = net.tx_channels[name];
+               if (chan && chan.auth)
+                       continue;
+
+               network_open_channel(net, name, data);
+               n_pending++;
+       }
+
+       if (n_pending)
+               net.timer.set(10 * 1000);
+}
+
+function network_open(name, info)
+{
+       let net = info;
+
+       net.socket = socket.listen(net.local_address, USYNC_PORT, {
+               family: socket.AF_INET6,
+               socktype: socket.SOCK_STREAM,
+               flags: socket.AI_NUMERICHOST,
+       }, null, true);
+
+       if (!net.socket) {
+               warn(`Failed to open socket: ${socket.error()}\n`);
+               return;
+       }
+
+       net.name = name;
+       net.rx_channels = {};
+       net.tx_channels = {};
+
+       net.socket.setopt(socket.SOL_TCP, socket.TCP_USER_TIMEOUT, 30 * 1000);
+
+       let cb = () => {
+               let addr = {};
+               let sock = net.socket.accept(addr);
+               if (sock)
+                       network_accept(net, sock, addr);
+       };
+
+       net.handle = uloop.handle(net.socket.fileno(), cb, uloop.ULOOP_READ);
+       net.timer = uloop.timer(100, () => network_connect_peers(net));
+
+       networks[name] = net;
+}
+
+function network_close(name)
+{
+       let net = networks[name];
+       net.timer.cancel();
+       net.handle.delete();
+       net.socket.close();
+       delete networks[name];
+}
+
+function network_update(name, info)
+{
+       let net = networks[name];
+       if (!net)
+               return;
+
+       if (net.local_host != info.local_host ||
+           net.local_address != info.local_address) {
+               network_close(name);
+               network_open(name, info);
+               return;
+       }
+
+       net.peers = info.peers;
+       net.timer.set(100);
+}
+
+function unetd_network_update()
+{
+       let data = ubus.call("unetd", "network_get");
+       if (!data || !data.networks)
+               return;
+
+       for (let name, info in data.networks) {
+               core.dbg(`Add network ${name}\n`);
+               if (networks[name])
+                       network_update(name, info);
+               else
+                       network_open(name, info);
+       }
+
+       for (let name in networks)
+               if (!data.networks)
+                       network_close(name);
+}
+
+function unetd_cb(msg)
+{
+       if (msg.type == "network_update")
+               unetd_network_update();
+       return 0;
+}
+
+export function pubsub_set(kind, name, enabled)
+{
+       for (let net_name, net in networks) {
+               for (let host_name, chan in net.tx_channels) {
+                       if (!chan.auth)
+                               continue;
+
+                       chan.channel.request({
+                               method: kind,
+                               data: { name, enabled },
+                               no_return: true,
+                       });
+               }
+       }
+};
+
+export function init(_core)
+{
+       core = _core;
+       ubus = core.ubus;
+       sub = ubus.subscriber(unetd_cb);
+       unetd_network_update();
+       ev_listener = ubus.listener("ubus.object.add", (event, msg) => {
+               if (msg.path == "unetd")
+                       sub.subscribe(msg.path);
+       });
+       sub.subscribe("unetd");
+};
diff --git a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd.uc b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd.uc
new file mode 100644 (file)
index 0000000..62b4138
--- /dev/null
@@ -0,0 +1,228 @@
+// SPDX-License-Identifier: GPL-2.0+
+/*
+ * Copyright (C) 2025 Felix Fietkau <nbd@nbd.name>
+ */
+'use strict';
+import * as client from "./unetmsgd-client.uc";
+import * as remote from "./unetmsgd-remote.uc";
+import { gen_id } from "./utils.uc";
+
+function __acl_check(list, name)
+{
+       for (let cur in list)
+               if (wildcard(name, cur, true))
+                       return true;
+}
+
+function acl_check(acl_type, info, names)
+{
+       let acl = this.acl;
+
+       if (info.user == "root")
+               return true;
+
+       let list = acl[acl_type][info.user] ?? [];
+       if (info.group) {
+               let list2 = acl[acl_type][":" + info.group];
+               if (list2)
+                       list = [ ...list, ...list2 ];
+       }
+
+       for (let name in names)
+               if (!__acl_check(list, name))
+                       return;
+
+       return true;
+}
+
+function new_handle(list, name, data)
+{
+       let id = gen_id();
+       data._id = id;
+       list[name] ??= {};
+       list[name][id] = data;
+       return data;
+}
+
+function pubsub_add(kind, name, data)
+{
+       let list = this[kind];
+       if (!length(list[name])) {
+               list[name] = {};
+               remote.pubsub_set(kind, name, true);
+       }
+       return new_handle(this[kind], name, data);
+}
+
+function pubsub_del(kind, name, data)
+{
+       let list = this[kind][name];
+       delete list[data._id];
+       if (!length(list))
+               remote.pubsub_set(kind, name, false);
+}
+
+function get_handles(handle, local, remote)
+{
+       let handles = [];
+
+       for (let cur_id, cur in local) {
+               if (handle) {
+                       if (handle.id == cur_id)
+                               continue;
+                       if (handle.client && handle.client == cur.client)
+                               continue;
+               }
+
+               push(handles, cur);
+       }
+
+       if (!remote)
+               return handles;
+
+       for (let cur_id, cur in remote)
+               push(handles, cur);
+
+       return handles;
+}
+
+function handle_request(handle, req, data, remote)
+{
+       let name = data.name;
+       let local = this.publish[name];
+       if (remote)
+               remote = this.remote_publish[name];
+       let handles = get_handles(handle, local, remote);
+
+       let context = {
+               pending: length(handles),
+               req
+       };
+
+       if (!context.pending)
+               return 0;
+
+       req.defer();
+       let cb = (ret) => {
+               if (--context.pending > 0)
+                       return;
+               req.reply();
+       };
+
+       for (let cur in handles) {
+               if (!cur || !cur.get_channel) {
+                       continue;
+               }
+               let chan = cur.get_channel();
+               if (!chan) {
+                       cb();
+                       continue;
+               }
+
+               let cur_handle = cur;
+               let data_cb = (msg) => {
+                       if (cur_handle.get_response_data)
+                               msg = cur.get_response_data(msg);
+                       req.reply(msg, true);
+               };
+
+               chan.defer({
+                       method: "request",
+                       data, cb, data_cb
+               });
+       }
+}
+
+function handle_message(handle, data, remote)
+{
+       let name = data.name;
+       let local = this.subscribe[name];
+       if (remote)
+               remote = this.remote_subscribe[name];
+       let handles = get_handles(handle, local, remote);
+       for (let cur in handles) {
+               if (!cur || !cur.get_channel)
+                       continue;
+
+               let chan = cur.get_channel();
+               if (!chan)
+                       continue;
+
+               chan.request({
+                       method: "message",
+                       no_return: true,
+                       data,
+               });
+       }
+       return 0;
+}
+
+function add_acl(type, user, data)
+{
+       if (!data || !user)
+               return;
+
+       type[user] ??= [];
+       let list = type[user];
+       for (let cur in data)
+               if (index(list, data) < 0)
+                       push(list, cur);
+}
+
+function acl_set(acl_data)
+{
+       let acl = this.acl = {
+               publish: {},
+               subscribe: {},
+       };
+
+       for (let cur in acl_data) {
+               if (cur.obj != "unetmsg" || !cur.acl)
+                       continue;
+
+               if (cur.group)
+                       cur.group = ":" + cur.group;
+
+               for (let user in [ cur.user, cur.group ]) {
+                       add_acl(acl.publish, user, cur.acl.publish);
+                       add_acl(acl.subscribe, user, cur.acl.publish);
+                       add_acl(acl.subscribe, user, cur.acl.subscribe);
+               }
+       }
+};
+
+const core_proto = {
+       acl_check,
+       acl_set,
+       pubsub_add,
+       pubsub_del,
+       handle_request,
+       handle_message,
+       dbg: function(msg) {
+               if (this.debug_enabled)
+                       warn(msg);
+       },
+       exception: function(e) {
+               this.dbg(`Exception: ${e}\n${e.stacktrace[0].context}`);
+       }
+};
+
+export function init(ubus, debug_enabled)
+{
+       let data = proto({
+               clients: {},
+               publish: {},
+               subscribe: {},
+               remote_publish: {},
+               remote_subscribe: {},
+               client,
+               remote,
+               ubus,
+               debug_enabled
+       }, core_proto);
+
+       client.set_core(data);
+       remote.init(data);
+
+       return data;
+};
diff --git a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/utils.uc b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/utils.uc
new file mode 100644 (file)
index 0000000..6b3b3ae
--- /dev/null
@@ -0,0 +1,12 @@
+// SPDX-License-Identifier: GPL-2.0+
+/*
+ * Copyright (C) 2025 Felix Fietkau <nbd@nbd.name>
+ */
+'use strict';
+import { open } from "fs";
+
+export function gen_id()
+{
+       let id = open("/dev/urandom").read(12);
+       return join("", map(split(id, ""), (v) => sprintf("%02x", ord(v))));
+};