Initial Release
authorSteven Barth <steven@midlink.org>
Mon, 24 Aug 2015 08:20:32 +0000 (10:20 +0200)
committerSteven Barth <steven@midlink.org>
Mon, 24 Aug 2015 08:52:42 +0000 (10:52 +0200)
20 files changed:
.gitignore [new file with mode: 0644]
.gitmodules [new file with mode: 0644]
CMakeLists.txt [new file with mode: 0644]
LICENSE [new file with mode: 0644]
README.md [new file with mode: 0644]
libubox [new submodule]
src/client.c [new file with mode: 0644]
src/client.h [new file with mode: 0644]
src/groups.c [new file with mode: 0644]
src/groups.h [new file with mode: 0644]
src/igmp.c [new file with mode: 0644]
src/mld.c [new file with mode: 0644]
src/mrib.c [new file with mode: 0644]
src/mrib.h [new file with mode: 0644]
src/omcproxy.c [new file with mode: 0644]
src/omcproxy.h [new file with mode: 0644]
src/proxy.c [new file with mode: 0644]
src/proxy.h [new file with mode: 0644]
src/querier.c [new file with mode: 0644]
src/querier.h [new file with mode: 0644]

diff --git a/.gitignore b/.gitignore
new file mode 100644 (file)
index 0000000..ab70e7e
--- /dev/null
@@ -0,0 +1,17 @@
+.project
+.cproject
+CMakeCache.txt
+CMakeFiles
+CMakeScripts
+cmake_install.cmake
+Makefile
+omcproxy
+CTestTestfile.cmake
+Testing
+coverage.info
+test_ifgroup
+test_rib
+coverage
+.settings
+install_manifest.txt
+*~
diff --git a/.gitmodules b/.gitmodules
new file mode 100644 (file)
index 0000000..936f1cb
--- /dev/null
@@ -0,0 +1,3 @@
+[submodule "libubox"]
+       path = libubox
+       url = http://git.openwrt.org/project/libubox.git
diff --git a/CMakeLists.txt b/CMakeLists.txt
new file mode 100644 (file)
index 0000000..00d6b14
--- /dev/null
@@ -0,0 +1,28 @@
+cmake_minimum_required(VERSION 2.8.8)
+
+project(omcproxy C)
+
+set(CMAKE_SHARED_LIBRARY_LINK_C_FLAGS "")
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g -std=c99")
+
+add_definitions(-D_GNU_SOURCE -Wall -Wno-gnu)
+
+if(${L_LEVEL})
+  add_definitions(-DL_LEVEL=${L_LEVEL})
+endif(${L_LEVEL})
+
+if(WITH_LIBUBOX)
+  add_definitions(-Wextra)
+  set(PLATFORM_LINK ${PLATFORM_LINK} ubox)
+else (WITH_LIBUBOX)
+  add_definitions(-Dtypeof=__typeof)
+  include_directories(BEFORE .)
+  set(PLATFORM_SOURCE ${PLATFORM_SOURCE} libubox/uloop.c libubox/avl.c libubox/blobmsg.c libubox/blob.c)
+endif(WITH_LIBUBOX)
+
+add_executable(omcproxy src/client.c src/mrib.c src/querier.c src/groups.c src/igmp.c src/mld.c src/proxy.c src/omcproxy.c ${PLATFORM_SOURCE})
+target_link_libraries(omcproxy ${PLATFORM_LINK})
+
+install(TARGETS omcproxy DESTINATION sbin/)
+
+
diff --git a/LICENSE b/LICENSE
new file mode 100644 (file)
index 0000000..d645695
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
diff --git a/README.md b/README.md
new file mode 100644 (file)
index 0000000..b83fe9e
--- /dev/null
+++ b/README.md
@@ -0,0 +1,30 @@
+# omcproxy - Embedded IGMPv3 and MLDv2 proxy
+
+omcproxy is an IGMPv3 and MLDv2 multicast proxy for use in embedded Linux
+devices like routers. It is small in size and can be compiled to <40 KB.
+
+It is partly based on code of https://github.com/Oryon/pimbd
+
+## Specifications and Features
+
+1. Source-Specific Multicast Querier
+       - MLDv2 querier (based on RFC 3810)
+       - IGMPv3 querier (based on RFC 3376)
+
+2. Multicast Proxying (based on RFC 4605)
+       - Kernel-space multicast routing
+       - Multiple instances support
+       - Address-scope specific proxying
+       
+
+## Compiling
+
+omcproxy uses libubox as submodule, be sure to clone this git repository
+with --recursive or run: "git submodule init; git submodule update"
+after cloning. If you are already using libubox as a shared library
+just pass -DWITH_LIBUBOX=1 to cmake.
+
+omcproxy uses cmake:
+- To prepare a Makefile use:  "cmake ." 
+- To build / install use: "make" / "make install" afterwards.
+- To build DEB or RPM packages use: "make package" afterwards.
diff --git a/libubox b/libubox
new file mode 160000 (submodule)
index 0000000..e88d816
--- /dev/null
+++ b/libubox
@@ -0,0 +1 @@
+Subproject commit e88d816d6e462180f0337565e04e36be58a63309
diff --git a/src/client.c b/src/client.c
new file mode 100644 (file)
index 0000000..5d71adf
--- /dev/null
@@ -0,0 +1,117 @@
+/*
+ * Author: Steven Barth <steven at midlink.org>
+ *
+ * Copyright 2015 Deutsche Telekom AG
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <errno.h>
+#include <string.h>
+#include <unistd.h>
+#include <alloca.h>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <linux/mroute6.h>
+#include <libubox/list.h>
+#include <libubox/avl.h>
+
+#include "client.h"
+
+
+// Add / update / remove a client entry for a multicast group
+int client_set(struct client *client, const struct in6_addr *group,
+               bool include, const struct in6_addr sources[], size_t cnt)
+{
+       int family = (IN6_IS_ADDR_V4MAPPED(group)) ? AF_INET : AF_INET6;
+       int sol = (family == AF_INET) ? SOL_IP : SOL_IPV6;
+       char addrbuf[INET6_ADDRSTRLEN];
+       size_t len = sizeof(struct group_filter) + cnt * sizeof(struct sockaddr_storage);
+       struct {
+               struct group_filter f;
+               struct sockaddr_storage s[];
+       } *filter = alloca(len);
+       struct sockaddr_in *in_addr = (struct sockaddr_in*)&filter->f.gf_group;
+       struct sockaddr_in6 *in6_addr = (struct sockaddr_in6*)&filter->f.gf_group;
+
+       inet_ntop(AF_INET6, group, addrbuf, sizeof(addrbuf));
+       L_DEBUG("%s: %s on %d => %s (+%d sources)", __FUNCTION__, addrbuf,
+                       client->ifindex, (include) ? "include" : "exclude", (int)cnt);
+
+       // Construct MSFILTER for outgoing IGMP / MLD
+       memset(filter, 0, len);
+       filter->f.gf_interface = client->ifindex;
+       filter->f.gf_fmode = include ? MCAST_INCLUDE : MCAST_EXCLUDE;
+       filter->f.gf_group.ss_family = family;
+       filter->f.gf_numsrc = cnt;
+
+       if (family == AF_INET)
+               client_unmap(&in_addr->sin_addr, group);
+       else
+               in6_addr->sin6_addr = *group;
+
+       for (size_t i = 0; i < cnt; ++i) {
+               filter->f.gf_slist[i].ss_family = family;
+
+               in_addr = (struct sockaddr_in*)&filter->f.gf_slist[i];
+               in6_addr = (struct sockaddr_in6*)&filter->f.gf_slist[i];
+
+               if (family == AF_INET)
+                       client_unmap(&in_addr->sin_addr, &sources[i]);
+               else
+                       in6_addr->sin6_addr = sources[i];
+       }
+
+       int fd = (family == AF_INET) ? client->igmp_fd : client->mld_fd;
+       setsockopt(fd, sol, MCAST_LEAVE_GROUP, filter, sizeof(struct group_req));
+       if (!include || cnt > 0) {
+               if (setsockopt(fd, sol, MCAST_JOIN_GROUP, filter, sizeof(struct group_req))
+                               && family == AF_INET && errno == ENOBUFS) {
+                       L_WARN("proxy: kernel denied joining multicast group. check igmp_max_memberships?");
+                       return -errno;
+               }
+
+               if (setsockopt(fd, sol, MCAST_MSFILTER, filter, len))
+                       return -errno;
+       }
+       return 0;
+}
+
+// Initialize client-instance
+int client_init(struct client *client, int ifindex)
+{
+       client->igmp_fd = socket(AF_INET, SOCK_DGRAM | SOCK_CLOEXEC, 0);
+       if (client->igmp_fd < 0)
+               return -errno;
+
+       client->mld_fd = socket(AF_INET6, SOCK_DGRAM | SOCK_CLOEXEC, 0);
+       if (client->mld_fd < 0)
+               return -errno;
+
+       client->ifindex = ifindex;
+       return 0;
+}
+
+// Cleanup client-instance
+void client_deinit(struct client *client)
+{
+       if (client->ifindex) {
+               close(client->igmp_fd);
+               close(client->mld_fd);
+               client->igmp_fd = -1;
+               client->mld_fd = -1;
+               client->ifindex = 0;
+       }
+}
diff --git a/src/client.h b/src/client.h
new file mode 100644 (file)
index 0000000..f8fea78
--- /dev/null
@@ -0,0 +1,49 @@
+/*
+ * Author: Steven Barth <steven at midlink.org>
+ *
+ * Copyright 2015 Deutsche Telekom AG
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+#include <stdbool.h>
+#include <stdlib.h>
+#include <netinet/in.h>
+#include "omcproxy.h"
+
+#define PROXY_MAX_SOURCES 1000
+
+
+struct client {
+       int igmp_fd;
+       int mld_fd;
+       int ifindex;
+};
+
+// Register a new interface to proxy
+int client_init(struct client *client, int ifindex);
+
+// Deregister a new interface from proxy
+void client_deinit(struct client *client);
+
+// Set / update / delete a multicast proxy entry
+int client_set(struct client *client, const struct in6_addr *group, bool include,
+               const struct in6_addr sources[], size_t cnt);
+
+// Unmap IPv4 address
+static inline void client_unmap(struct in_addr *addr4, const struct in6_addr *addr6)
+{
+       addr4->s_addr = addr6->s6_addr32[3];
+}
diff --git a/src/groups.c b/src/groups.c
new file mode 100644 (file)
index 0000000..17e2c67
--- /dev/null
@@ -0,0 +1,498 @@
+/*
+ * Author: Steven Barth <steven at midlink.org>
+ *
+ * Copyright 2015 Deutsche Telekom AG
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <string.h>
+#include <stdlib.h>
+#include <errno.h>
+#include "groups.h"
+
+
+// Group comparator for AVL-tree
+static int compare_groups(const void *k1, const void *k2, __unused void *ptr)
+{
+       return memcmp(k1, k2, sizeof(struct in6_addr));
+}
+
+// Remove a source-definition for a group
+static void querier_remove_source(struct group *group, struct group_source *source)
+{
+       --group->source_count;
+       list_del(&source->head);
+       free(source);
+}
+
+// Clear all sources of a certain group
+static void querier_clear_sources(struct group *group)
+{
+       struct group_source *s, *n;
+       list_for_each_entry_safe(s, n, &group->sources, head)
+               querier_remove_source(group, s);
+}
+
+// Remove a group and all associated sources from the group state
+static void querier_remove_group(struct groups *groups, struct group *group, omgp_time_t now)
+{
+       querier_clear_sources(group);
+       group->exclude_until = 0;
+
+       if (groups->cb_update)
+               groups->cb_update(groups, group, now);
+
+       avl_delete(&groups->groups, &group->node);
+       free(group);
+}
+
+// Expire a group and / or its associated sources depending on the current time
+static omgp_time_t expire_group(struct groups *groups, struct group *group,
+               omgp_time_t now, omgp_time_t next_event)
+{
+       struct groups_config *cfg = IN6_IS_ADDR_V4MAPPED(&group->addr) ? &groups->cfg_v4 : &groups->cfg_v6;
+       omgp_time_t llqi = now + cfg->last_listener_query_interval;
+       omgp_time_t llqt = now + (cfg->last_listener_query_interval * cfg->last_listener_query_count);
+
+       // Handle group and source-specific query retransmission
+       struct list_head suppressed = LIST_HEAD_INIT(suppressed);
+       struct list_head unsuppressed = LIST_HEAD_INIT(unsuppressed);
+       struct group_source *s, *s2;
+
+       if (group->next_source_transmit > 0 && group->next_source_transmit <= now) {
+               group->next_source_transmit = 0;
+
+               list_for_each_entry_safe(s, s2, &group->sources, head) {
+                       if (s->retransmit > 0) {
+                               list_move_tail(&s->head, (s->include_until > llqt) ? &suppressed : &unsuppressed);
+                               --s->retransmit;
+                       }
+
+                       if (s->retransmit > 0)
+                               group->next_source_transmit = llqi;
+               }
+       }
+
+       if (group->next_source_transmit > 0 && group->next_source_transmit < next_event)
+               next_event = group->next_source_transmit;
+
+       // Handle group-specific query retransmission
+       if (group->retransmit > 0 && group->next_generic_transmit <= now) {
+               group->next_generic_transmit = 0;
+
+               if (groups->cb_query)
+                       groups->cb_query(groups, &group->addr, NULL, group->exclude_until > llqt);
+
+               --group->retransmit;
+
+               if (group->retransmit > 0)
+                       group->next_generic_transmit = llqi;
+
+               // Skip suppresed source-specific query (RFC 3810 7.6.3.2)
+               list_splice_init(&suppressed, &group->sources);
+       }
+
+       if (group->next_generic_transmit > 0 && group->next_generic_transmit < next_event)
+               next_event = group->next_generic_transmit;
+
+       if (!list_empty(&suppressed)) {
+               if (groups->cb_query)
+                               groups->cb_query(groups, &group->addr, &suppressed, true);
+
+               list_splice(&suppressed, &group->sources);
+       }
+
+       if (!list_empty(&unsuppressed)) {
+               if (groups->cb_query)
+                               groups->cb_query(groups, &group->addr, &unsuppressed, false);
+
+               list_splice(&unsuppressed, &group->sources);
+       }
+
+       // Handle source and group expiry
+       bool changed = false;
+       if (group->exclude_until > 0) {
+               if (group_is_included(group, now)) {
+                       // Leaving exclude mode
+                       group->exclude_until = 0;
+                       changed = true;
+               } else if (group->exclude_until < next_event) {
+                       next_event = group->exclude_until;
+               }
+       }
+
+       list_for_each_entry_safe(s, s2, &group->sources, head) {
+               if (s->include_until > 0) {
+                       if (!source_is_included(s, now)) {
+                               s->include_until = 0;
+                               changed = true;
+                       } else if (s->include_until < next_event) {
+                               next_event = s->include_until;
+                       }
+               }
+
+               if (group->exclude_until == 0 && s->include_until == 0)
+                       querier_remove_source(group, s);
+       }
+
+       if (group->exclude_until == 0 && group->source_count == 0)
+               querier_remove_group(groups, group, now);
+       else if (changed && groups->cb_update)
+               groups->cb_update(groups, group, now);
+
+       return next_event;
+}
+
+// Rearm the global groups-timer if the next event is before timer expiration
+static void rearm_timer(struct groups *groups, int msecs)
+{
+       int remain = uloop_timeout_remaining(&groups->timer);
+       if (remain < 0 || remain >= msecs)
+               uloop_timeout_set(&groups->timer, msecs);
+}
+
+// Expire all groups of a group-state (called by timer as callback)
+static void expire_groups(struct uloop_timeout *t)
+{
+       struct groups *groups = container_of(t, struct groups, timer);
+       omgp_time_t now = omgp_time();
+       omgp_time_t next_event = now + 3600 * OMGP_TIME_PER_SECOND;
+
+       struct group *group, *n;
+       avl_for_each_element_safe(&groups->groups, group, node, n)
+               next_event = expire_group(groups, group, now, next_event);
+
+       rearm_timer(groups, (next_event > now) ? next_event - now : 0);
+}
+
+// Initialize a group-state
+void groups_init(struct groups *groups)
+{
+       avl_init(&groups->groups, compare_groups, false, NULL);
+       groups->timer.cb = expire_groups;
+
+       groups_update_config(groups, false, 10 * OMGP_TIME_PER_SECOND,
+                       125 * OMGP_TIME_PER_SECOND, 2);
+       groups_update_config(groups, true, 10 * OMGP_TIME_PER_SECOND,
+                               125 * OMGP_TIME_PER_SECOND, 2);
+}
+
+// Cleanup a group-state
+void groups_deinit(struct groups *groups)
+{
+       omgp_time_t now = omgp_time();
+       struct group *group, *safe;
+       avl_for_each_element_safe(&groups->groups, group, node, safe)
+               querier_remove_group(groups, group, now);
+       uloop_timeout_cancel(&groups->timer);
+}
+
+// Get group-object for a given group, create if requested
+static struct group* groups_get_group(struct groups *groups,
+               const struct in6_addr *addr, bool *created)
+{
+       struct group *group = avl_find_element(&groups->groups, addr, group, node);
+       if (!group && created && (group = calloc(1, sizeof(*group)))) {
+               group->addr = *addr;
+               group->node.key = &group->addr;
+               avl_insert(&groups->groups, &group->node);
+
+               INIT_LIST_HEAD(&group->sources);
+               *created = true;
+       } else if (created) {
+               *created = false;
+       }
+       return group;
+}
+
+// Get source-object for a given source, create if requested
+static struct group_source* groups_get_source(struct groups *groups,
+               struct group *group, const struct in6_addr *addr, bool *created)
+{
+       struct group_source *c, *source = NULL;
+       group_for_each_source(c, group)
+               if (IN6_ARE_ADDR_EQUAL(&c->addr, addr))
+                       source = c;
+
+       if (!source && created && group->source_count < groups->source_limit &&
+                       (source = calloc(1, sizeof(*source)))) {
+               source->addr = *addr;
+               list_add_tail(&source->head, &group->sources);
+               ++group->source_count;
+               *created = true;
+       } else if (created) {
+               *created = false;
+       }
+
+       return source;
+}
+
+// Update the IGMP/MLD timers of a group-state
+void groups_update_config(struct groups *groups, bool v6,
+               omgp_time_t query_response_interval, omgp_time_t query_interval, int robustness)
+{
+       struct groups_config *cfg = v6 ? &groups->cfg_v6 : &groups->cfg_v4;
+       cfg->query_response_interval = query_response_interval;
+       cfg->query_interval = query_interval;
+       cfg->robustness = robustness;
+       cfg->last_listener_query_count = cfg->robustness;
+       cfg->last_listener_query_interval = 1 * OMGP_TIME_PER_SECOND;
+}
+
+// Update timers for a given group (called when receiving queries from other queriers)
+void groups_update_timers(struct groups *groups,
+               const struct in6_addr *groupaddr,
+               const struct in6_addr *addrs, size_t len)
+{
+       char addrbuf[INET6_ADDRSTRLEN];
+       inet_ntop(AF_INET6, groupaddr, addrbuf, sizeof(addrbuf));
+       struct group *group = groups_get_group(groups, groupaddr, NULL);
+       if (!group) {
+               L_WARN("%s: failed to update timer: no such group %s", __FUNCTION__, addrbuf);
+               return;
+       }
+
+       struct groups_config *cfg = IN6_IS_ADDR_V4MAPPED(&group->addr) ? &groups->cfg_v4 : &groups->cfg_v6;
+       omgp_time_t now = omgp_time();
+       omgp_time_t llqt = now + (cfg->last_listener_query_count * cfg->last_listener_query_interval);
+
+       if (len == 0) {
+               if (group->exclude_until > llqt)
+                       group->exclude_until = llqt;
+       } else {
+               for (size_t i = 0; i < len; ++i) {
+                       struct group_source *source = groups_get_source(groups, group, &addrs[i], NULL);
+                       if (!source) {
+                               L_WARN("%s: failed to update timer: unknown sources for group %s", __FUNCTION__, addrbuf);
+                               continue;
+                       }
+
+                       if (source->include_until > llqt)
+                               source->include_until = llqt;
+               }
+       }
+
+       rearm_timer(groups, llqt - now);
+}
+
+// Update state of a given group (on reception of node's IGMP/MLD packets)
+void groups_update_state(struct groups *groups,
+               const struct in6_addr *groupaddr,
+               const struct in6_addr *addrs, size_t len,
+               enum groups_update update)
+{
+       bool created = false, changed = false;
+       char addrbuf[INET6_ADDRSTRLEN];
+       inet_ntop(AF_INET6, groupaddr, addrbuf, sizeof(addrbuf));
+       L_DEBUG("%s: %s (+%d sources) => %d", __FUNCTION__, addrbuf, (int)len, update);
+
+       struct group *group = groups_get_group(groups, groupaddr, &created);
+       if (!group) {
+               L_ERR("querier_state: failed to allocate group for %s", addrbuf);
+               return;
+       }
+
+       if (created)
+               changed = true;
+
+       omgp_time_t now = omgp_time();
+       omgp_time_t next_event = OMGP_TIME_MAX;
+       struct groups_config *cfg = IN6_IS_ADDR_V4MAPPED(&group->addr) ? &groups->cfg_v4 : &groups->cfg_v6;
+
+       // Backwards compatibility modes
+       if (group->compat_v2_until > now || group->compat_v1_until > now) {
+               if (update == UPDATE_BLOCK)
+                       return;
+
+               if (group->compat_v1_until > now && (update == UPDATE_DONE || update == UPDATE_TO_IN))
+                       return;
+
+               if (update == UPDATE_TO_EX)
+                       len = 0;
+       }
+
+       if (update == UPDATE_REPORT || update == UPDATE_REPORT_V1 || update == UPDATE_DONE) {
+               omgp_time_t compat_until = now + cfg->query_response_interval +
+                               (cfg->robustness * cfg->query_interval);
+
+               if (update == UPDATE_REPORT_V1)
+                       group->compat_v1_until = compat_until;
+               else if (update == UPDATE_REPORT)
+                       group->compat_v2_until = compat_until;
+
+               update = (update == UPDATE_DONE) ? UPDATE_TO_IN : UPDATE_IS_EXCLUDE;
+               len = 0;
+       }
+
+       bool include = group->exclude_until <= now;
+       bool is_include = update == UPDATE_IS_INCLUDE || update == UPDATE_TO_IN || update == UPDATE_ALLOW;
+
+       int llqc = cfg->last_listener_query_count;
+       omgp_time_t mali = now + (cfg->robustness * cfg->query_interval) + cfg->query_response_interval;
+       omgp_time_t llqt = now + (cfg->last_listener_query_interval * llqc);
+
+       // RFC 3810 7.4
+       struct list_head saved = LIST_HEAD_INIT(saved);
+       struct list_head queried = LIST_HEAD_INIT(queried);
+       for (size_t i = 0; i < len; ++i) {
+               bool *create = (include && update == UPDATE_BLOCK) ? NULL : &created;
+               struct group_source *source = groups_get_source(groups, group, &addrs[i], create);
+
+               if (include && update == UPDATE_BLOCK) {
+                       if (source)
+                               list_move_tail(&source->head, &queried);
+               } else {
+                       bool query = false;
+                       if (!source) {
+                               groups_update_state(groups, groupaddr, NULL, 0, false);
+                               L_WARN("querier: failed to allocate source for %s, fallback to ASM", addrbuf);
+                               return;
+                       }
+
+                       if (created)
+                               changed = true;
+                       else if (include && update == UPDATE_TO_EX)
+                               query = true;
+
+                       if (source->include_until <= now && update == UPDATE_SET_IN) {
+                               source->include_until = mali;
+                               changed = true;
+                       } else if (source->include_until > now && update == UPDATE_SET_EX) {
+                               source->include_until = now;
+                               changed = true;
+                       }
+
+                       if (!include && (update == UPDATE_BLOCK || update == UPDATE_TO_EX) &&
+                                       (created || source->include_until > now))
+                               query = true;
+
+                       if ((is_include || (!include && created))) {
+                               if (source->include_until <= now)
+                                       changed = true;
+
+                               source->include_until = (is_include || update == UPDATE_IS_EXCLUDE)
+                                               ? mali : group->exclude_until;
+
+                               if (next_event > mali)
+                                       next_event = mali;
+                       }
+
+                       if (query)
+                               list_move_tail(&source->head, &queried);
+                       else if (update == UPDATE_IS_EXCLUDE || update == UPDATE_TO_EX ||
+                                       update == UPDATE_SET_EX || update == UPDATE_SET_IN)
+                               list_move_tail(&source->head, &saved);
+               }
+       }
+
+       if (update == UPDATE_IS_EXCLUDE || update == UPDATE_TO_EX || update == UPDATE_SET_EX) {
+               if (include || !list_empty(&group->sources))
+                       changed = true;
+
+               querier_clear_sources(group);
+               list_splice(&saved, &group->sources);
+               group->exclude_until = mali;
+
+               if (next_event > mali)
+                       next_event = mali;
+       }
+
+       if (update == UPDATE_SET_IN) {
+               if (!include || !list_empty(&group->sources)) {
+                       changed = true;
+                       next_event = now;
+               }
+
+               querier_clear_sources(group);
+               list_splice(&saved, &group->sources);
+               group->exclude_until = now;
+       }
+
+       // Prepare queries
+       if (update == UPDATE_TO_IN) {
+               struct group_source *source, *n;
+               list_for_each_entry_safe(source, n, &group->sources, head) {
+                       if (source->include_until <= now)
+                               continue;
+
+                       size_t i;
+                       for (i = 0; i < len && !IN6_ARE_ADDR_EQUAL(&source->addr, &addrs[i]); ++i);
+                       if (i == len)
+                               list_move_tail(&source->head, &queried);
+               }
+       }
+
+       if (!list_empty(&queried)) {
+               struct group_source *source;
+               list_for_each_entry(source, &queried, head) {
+                       if (source->include_until > llqt)
+                               source->include_until = llqt;
+
+                       group->next_source_transmit = now;
+                       source->retransmit = llqc;
+               }
+
+               next_event = now;
+               list_splice(&queried, &group->sources);
+       }
+
+       if (!include && update == UPDATE_TO_IN) {
+               if (group->exclude_until > llqt)
+                       group->exclude_until = llqt;
+
+               group->next_generic_transmit = now;
+               group->retransmit = llqc;
+               next_event = now;
+       }
+
+       if (changed && groups->cb_update)
+               groups->cb_update(groups, group, now);
+
+       if (group_is_included(group, now) && group->source_count == 0)
+               next_event = now;
+
+       if (next_event < OMGP_TIME_MAX)
+               rearm_timer(groups, next_event - now);
+
+       if (changed)
+               L_DEBUG("%s: %s => %s (+%d sources)", __FUNCTION__, addrbuf,
+                               (group_is_included(group, now)) ? "included" : "excluded",
+                               (int)group->source_count);
+
+}
+
+// Get group object of a given group
+const struct group* groups_get(struct groups *groups, const struct in6_addr *addr)
+{
+       return groups_get_group(groups, addr, NULL);
+}
+
+// Test if a group (and source) is requested in the current group state
+// (i.e. for deciding if it should be routed / forwarded)
+bool groups_includes_group(struct groups *groups, const struct in6_addr *addr,
+               const struct in6_addr *src, omgp_time_t time)
+{
+       struct group *group = groups_get_group(groups, addr, NULL);
+       if (group) {
+               if (!src && (!group_is_included(group, time) || group->source_count > 0))
+                       return true;
+
+               struct group_source *source = groups_get_source(groups, group, src, NULL);
+               if ((!group_is_included(group, time) && (!source || source_is_included(source, time))) ||
+                               (group_is_included(group, time) && source && source_is_included(source, time)))
+                       return true;
+       }
+       return false;
+}
diff --git a/src/groups.h b/src/groups.h
new file mode 100644 (file)
index 0000000..2ea3073
--- /dev/null
@@ -0,0 +1,126 @@
+/*
+ * Author: Steven Barth <steven at midlink.org>
+ *
+ * Copyright 2015 Deutsche Telekom AG
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+
+#include <libubox/list.h>
+#include <libubox/avl.h>
+#include <libubox/uloop.h>
+#include <arpa/inet.h>
+#include "omcproxy.h"
+
+struct group {
+       struct avl_node node;
+       struct in6_addr addr;
+       struct list_head sources;
+       size_t source_count;
+       omgp_time_t exclude_until;
+       omgp_time_t compat_v2_until;
+       omgp_time_t compat_v1_until;
+       omgp_time_t next_generic_transmit;
+       omgp_time_t next_source_transmit;
+       int retransmit;
+};
+
+struct group_source {
+       struct list_head head;
+       struct in6_addr addr;
+       omgp_time_t include_until;
+       int retransmit;
+};
+
+struct groups_config {
+       omgp_time_t query_response_interval;
+       omgp_time_t query_interval;
+       omgp_time_t last_listener_query_interval;
+       int robustness;
+       int last_listener_query_count;
+};
+
+struct groups {
+       struct groups_config cfg_v4;
+       struct groups_config cfg_v6;
+       struct avl_tree groups;
+       struct uloop_timeout timer;
+       size_t source_limit;
+       size_t group_limit;
+       void (*cb_query)(struct groups *g, const struct in6_addr *addr,
+                       const struct list_head *sources, bool suppress);
+       void (*cb_update)(struct groups *g, struct group *group, omgp_time_t now);
+};
+
+
+void groups_init(struct groups *groups);
+void groups_deinit(struct groups *groups);
+
+
+enum groups_update {
+       UPDATE_UNSPEC,
+       UPDATE_IS_INCLUDE       = 1,
+       UPDATE_IS_EXCLUDE       = 2,
+       UPDATE_TO_IN            = 3,
+       UPDATE_TO_EX            = 4,
+       UPDATE_ALLOW            = 5,
+       UPDATE_BLOCK            = 6,
+       UPDATE_REPORT           = 7,
+       UPDATE_REPORT_V1        = 8,
+       UPDATE_DONE                     = 9,
+       UPDATE_SET_IN           = 0x11,
+       UPDATE_SET_EX           = 0x12,
+};
+
+void groups_update_config(struct groups *groups, bool v6,
+               omgp_time_t query_response_interval, omgp_time_t query_interval, int robustness);
+
+void groups_update_timers(struct groups *groups,
+               const struct in6_addr *groupaddr,
+               const struct in6_addr *addrs, size_t len);
+
+void groups_update_state(struct groups *groups,
+               const struct in6_addr *groupaddr,
+               const struct in6_addr *addrs, size_t len,
+               enum groups_update update);
+
+void groups_synthesize_events(struct groups *groups);
+
+// Groups user query API
+
+static inline bool group_is_included(const struct group *group, omgp_time_t time)
+{
+       return group->exclude_until <= time;
+}
+
+static inline bool source_is_included(const struct group_source *source, omgp_time_t time)
+{
+       return source->include_until > time;
+}
+
+#define groups_for_each_group(group, groupsp) \
+       avl_for_each_element(&(groupsp)->groups, group, node)
+
+#define group_for_each_source(source, group) \
+       list_for_each_entry(source, &(group)->sources, head)
+
+#define group_for_each_active_source(source, group, time) \
+       list_for_each_entry(source, &group->sources, head) \
+               if (source_is_included(source, time) == group_is_included(group, time))
+
+const struct group* groups_get(struct groups *groups, const struct in6_addr *addr);
+bool groups_includes_group(struct groups *groups, const struct in6_addr *addr,
+               const struct in6_addr *src, omgp_time_t time);
diff --git a/src/igmp.c b/src/igmp.c
new file mode 100644 (file)
index 0000000..aa03466
--- /dev/null
@@ -0,0 +1,209 @@
+/*
+ * Author: Steven Barth <steven at midlink.org>
+ *
+ * Copyright 2015 Deutsche Telekom AG
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <errno.h>
+#include <sys/socket.h>
+#include <sys/ioctl.h>
+#include <netinet/ip.h>
+#include <netinet/in.h>
+#include <net/if.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "querier.h"
+
+// Test if multicast-group is valid and relevant
+static inline bool igmp_is_valid_group(in_addr_t group)
+{
+       return IN_MULTICAST(be32_to_cpu(group));
+}
+
+// Handle an IGMP-record from an IGMP-packet (called by igmp_receive)
+static ssize_t igmp_handle_record(struct groups *groups, const uint8_t *data, size_t len)
+{
+       struct igmpv3_grec *r = (struct igmpv3_grec*)data;
+
+       if (len < sizeof(*r))
+               return -1;
+
+       size_t nsrc = ntohs(r->grec_nsrcs);
+       size_t read = sizeof(*r) + nsrc * sizeof(struct in_addr) + r->grec_auxwords * 4;
+
+       if (len < read)
+               return -1;
+
+       if (r->grec_type >= UPDATE_IS_INCLUDE && r->grec_type <= UPDATE_BLOCK &&
+                       igmp_is_valid_group(r->grec_mca)) {
+               struct in6_addr addr, sources[nsrc];
+               querier_map(&addr, r->grec_mca);
+
+               for (size_t i = 0; i < nsrc; ++i)
+                       querier_map(&sources[i], r->grec_src[i]);
+
+               groups_update_state(groups, &addr, sources, nsrc, r->grec_type);
+       }
+
+       return read;
+}
+
+// Receive and parse an IGMP-packet (called by uloop as callback)
+void igmp_handle(struct mrib_querier *mrib, const struct igmphdr *igmp, size_t len,
+               const struct sockaddr_in *from)
+{
+       struct querier_iface *q = container_of(mrib, struct querier_iface, mrib);
+       omgp_time_t now = omgp_time();
+       char addrbuf[INET_ADDRSTRLEN];
+       struct in6_addr group;
+
+       querier_map(&group, igmp->group);
+       inet_ntop(AF_INET, &from->sin_addr, addrbuf, sizeof(addrbuf));
+
+       if (igmp->type == IGMP_HOST_MEMBERSHIP_QUERY) {
+               struct igmpv3_query *query = (struct igmpv3_query*)igmp;
+
+               if (len != sizeof(*igmp) && ((size_t)len < sizeof(*query) ||
+                               (size_t)len < sizeof(*query) + ntohs(query->nsrcs) * sizeof(struct in_addr)))
+                       return;
+
+               if (query->group && !igmp_is_valid_group(query->group))
+                       return;
+
+               // Setup query target address
+               struct in_addr addr;
+               if (mrib_igmp_source(mrib, &addr))
+                       return;
+
+               bool suppress = false;
+               size_t nsrc = 0;
+               int robustness = 2;
+               omgp_time_t mrd = 10000;
+               omgp_time_t query_interval = 125000;
+
+               if (igmp->code)
+                       mrd = 100 * ((len == sizeof(*igmp)) ? igmp->code : querier_qqi(igmp->code));
+
+               if ((size_t)len > sizeof(*igmp)) {
+                       if (query->qrv)
+                               robustness = query->qrv;
+
+                       if (query->qqic)
+                               query_interval = querier_qqi(query->qqic) * 1000;
+
+                       suppress = query->suppress;
+                       nsrc = ntohs(query->nsrcs);
+               }
+
+               if (!suppress && query->group) {
+                       struct in6_addr sources[nsrc];
+                       for (size_t i = 0; i < nsrc; ++i)
+                               querier_map(&sources[i], query->srcs[i]);
+
+                       groups_update_timers(&q->groups, &group, sources, nsrc);
+               }
+
+               int election = memcmp(&from->sin_addr, &addr, sizeof(from->sin_addr));
+               L_INFO("%s: detected other querier %s with priority %d on %d",
+                               __FUNCTION__, addrbuf, election, q->ifindex);
+
+               // TODO: we ignore IGMPv1/v2 queriers for now, since a lot of them are dumb switches
+
+               if (election < 0 && !query->group && len > sizeof(*igmp)) {
+                       groups_update_config(&q->groups, false, mrd, query_interval, robustness);
+
+                       q->igmp_other_querier = true;
+                       q->igmp_next_query = now + (q->groups.cfg_v4.query_response_interval / 2) +
+                               (q->groups.cfg_v4.robustness * q->groups.cfg_v4.query_interval);
+               }
+       } else if (igmp->type == IGMPV3_HOST_MEMBERSHIP_REPORT) {
+               struct igmpv3_report *report = (struct igmpv3_report*)igmp;
+
+               if ((size_t)len <= sizeof(*report))
+                       return;
+
+               uint8_t *ibuf = (uint8_t*)igmp;
+               size_t count = ntohs(report->ngrec);
+               size_t offset = sizeof(*report);
+
+               while (count > 0 && offset < len) {
+                       ssize_t read = igmp_handle_record(&q->groups, &ibuf[offset], len - offset);
+                       if (read < 0)
+                               break;
+
+                       offset += read;
+                       --count;
+               }
+       } else if (igmp->type == IGMPV2_HOST_MEMBERSHIP_REPORT ||
+                       igmp->type == IGMP_HOST_LEAVE_MESSAGE ||
+                       igmp->type == IGMP_HOST_MEMBERSHIP_REPORT) {
+
+               if (len != sizeof(*igmp) || !igmp_is_valid_group(igmp->group))
+                       return;
+
+               groups_update_state(&q->groups, &group, NULL, 0,
+                               (igmp->type == IGMPV2_HOST_MEMBERSHIP_REPORT) ? UPDATE_REPORT :
+                               (igmp->type == IGMP_HOST_MEMBERSHIP_REPORT) ? UPDATE_REPORT_V1 : UPDATE_DONE);
+       }
+
+       uloop_timeout_set(&q->timeout, 0);
+}
+
+// Send generic / group-specific / group-and-source specific IGMP-query
+int igmp_send_query(struct querier_iface *q,
+               const struct in6_addr *group,
+               const struct list_head *sources,
+               bool suppress)
+{
+       uint8_t qqic = querier_qqic(((group) ? q->groups.cfg_v4.last_listener_query_interval :
+                       q->groups.cfg_v4.query_response_interval) / 100);
+       struct {
+               struct igmpv3_query q;
+               struct in_addr srcs[QUERIER_MAX_SOURCE];
+       } query = {.q = {
+               .type = IGMP_HOST_MEMBERSHIP_QUERY,
+               .code = qqic,
+               .qrv = q->groups.cfg_v4.robustness,
+               .suppress = suppress,
+               .qqic = querier_qqic(q->groups.cfg_v4.query_interval / 1000),
+       }};
+
+       struct group_source *s;
+       size_t cnt = 0;
+       if (sources) {
+               list_for_each_entry(s, sources, head) {
+                       if (cnt >= QUERIER_MAX_SOURCE) {
+                               L_WARN("%s: maximum source count (%d) exceeded",
+                                               __FUNCTION__, QUERIER_MAX_SOURCE);
+                               break;
+                       }
+
+                       query.q.srcs[cnt] = querier_unmap(&s->addr);
+               }
+       }
+       query.q.nsrcs = htons(cnt);
+
+       struct sockaddr_in dest = { .sin_family = AF_INET, .sin_addr = {htonl(0xe0000001U)}};
+       if (group) {
+               query.q.group = querier_unmap(group);
+               dest.sin_addr.s_addr = query.q.group;
+       }
+
+       return mrib_send_igmp(&q->mrib, &query.q,
+                       sizeof(query.q) + cnt * sizeof(query.srcs[0]), &dest);
+}
+
diff --git a/src/mld.c b/src/mld.c
new file mode 100644 (file)
index 0000000..c6734b1
--- /dev/null
+++ b/src/mld.c
@@ -0,0 +1,198 @@
+/*
+ * Author: Steven Barth <steven at midlink.org>
+ *
+ * Copyright 2015 Deutsche Telekom AG
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <errno.h>
+#include <stdlib.h>
+#include <arpa/inet.h>
+#include <sys/ioctl.h>
+#include <sys/socket.h>
+#include <sys/timerfd.h>
+
+#include <netinet/in.h>
+#include <netinet/ip6.h>
+#include <netinet/icmp6.h>
+#include <unistd.h>
+#include <ifaddrs.h>
+#include <linux/mroute6.h>
+
+#include "mrib.h"
+#include "querier.h"
+
+struct mld_query {
+       struct mld_hdr mld;
+       uint8_t s_qrv;
+       uint8_t qqic;
+       uint16_t nsrc;
+       struct in6_addr addrs[0];
+};
+
+// Test whether group address is valid and interesting
+static inline bool mld_is_valid_group(const struct in6_addr *addr)
+{
+       return IN6_IS_ADDR_MULTICAST(addr);
+}
+
+// Handle Multicast Address Record from MLD-Packets (called by mld_receive)
+static ssize_t mld_handle_record(struct groups *groups, const uint8_t *data, size_t len)
+{
+       struct mld_record {
+               uint8_t type;
+               uint8_t aux;
+               uint16_t nsrc;
+               struct in6_addr addr;
+               struct in6_addr sources[];
+       } *r = (struct mld_record*)data;
+
+       if (len < sizeof(*r))
+               return -1;
+
+       size_t nsrc = ntohs(r->nsrc);
+       size_t read = sizeof(*r) + nsrc * sizeof(struct in6_addr) + r->aux;
+       if (len < read)
+               return -1;
+
+       if (r->type >= UPDATE_IS_INCLUDE && r->type <= UPDATE_BLOCK && mld_is_valid_group(&r->addr))
+               groups_update_state(groups, &r->addr, r->sources, nsrc, r->type);
+
+       return read;
+}
+
+// Receive an MLD-Packet from a node (called by uloop as callback)
+void mld_handle(struct mrib_querier *mrib, const struct mld_hdr *hdr, size_t len,
+               const struct sockaddr_in6 *from)
+{
+       char addrbuf[INET_ADDRSTRLEN];
+       omgp_time_t now = omgp_time();
+       inet_ntop(AF_INET6, &hdr->mld_addr, addrbuf, sizeof(addrbuf));
+
+       struct querier_iface *q = container_of(mrib, struct querier_iface, mrib);
+       if (hdr->mld_icmp6_hdr.icmp6_type == ICMPV6_MGM_QUERY) {
+               struct mld_query *query = (struct mld_query*)hdr;
+
+               if (len != 24 && ((size_t)len < sizeof(*query) ||
+                               (size_t)len < sizeof(*query) + ntohs(query->nsrc) * sizeof(struct in6_addr)))
+                       return;
+
+               if (!IN6_IS_ADDR_UNSPECIFIED(&query->mld.mld_addr) &&
+                               !mld_is_valid_group(&query->mld.mld_addr))
+                       return;
+
+               // Detect local source address
+               struct in6_addr addr;
+               if (mrib_mld_source(mrib, &addr))
+                       return;
+
+               bool suppress = false;
+               size_t nsrc = 0;
+               int robustness = 2;
+               omgp_time_t mrd = 10000;
+               omgp_time_t query_interval = 125000;
+
+               if (query->mld.mld_icmp6_hdr.icmp6_dataun.icmp6_un_data16[0])
+                       mrd = (len == 24) ? ntohs(query->mld.mld_icmp6_hdr.icmp6_dataun.icmp6_un_data16[0]) :
+                                       querier_mrd(query->mld.mld_icmp6_hdr.icmp6_dataun.icmp6_un_data16[0]);
+
+               if (len > 24) {
+                       if (query->s_qrv & 0x7)
+                               robustness = query->s_qrv & 0x7;
+
+                       if (query->qqic)
+                               query_interval = querier_qqi(query->qqic) * 1000;
+               }
+
+               if (!suppress && !IN6_IS_ADDR_UNSPECIFIED(&query->mld.mld_addr))
+                       groups_update_timers(&q->groups, &query->mld.mld_addr, query->addrs, nsrc);
+
+               int election = memcmp(&from->sin6_addr, &addr, sizeof(from->sin6_addr));
+               L_INFO("%s: detected other querier %s with priority %d on %d",
+                               __FUNCTION__, addrbuf, election, q->ifindex);
+
+               // TODO: we ignore MLDv1 queriers for now, since a lot of them are dumb switches
+
+               if (election < 0 && IN6_IS_ADDR_UNSPECIFIED(&query->mld.mld_addr) && len > 24) {
+                       groups_update_config(&q->groups, true, mrd, query_interval, robustness);
+
+                       q->mld_other_querier = true;
+                       q->mld_next_query = now + (q->groups.cfg_v6.query_response_interval / 2) +
+                               (q->groups.cfg_v6.robustness * q->groups.cfg_v6.query_interval);
+               }
+       } else if (hdr->mld_icmp6_hdr.icmp6_type == ICMPV6_MLD2_REPORT) {
+               struct icmp6_hdr *mld_report = (struct icmp6_hdr *)hdr;
+               if ((size_t)len <= sizeof(*mld_report))
+                       return;
+
+               uint8_t *buf = (uint8_t*)hdr;
+               size_t count = ntohs(mld_report->icmp6_dataun.icmp6_un_data16[1]);
+               ssize_t offset = sizeof(*mld_report);
+
+               while (count > 0 && offset < (ssize_t)len) {
+                       ssize_t read = mld_handle_record(&q->groups, &buf[offset], len - offset);
+                       if (read < 0)
+                               break;
+
+                       offset += read;
+                       --count;
+               }
+       } else if (hdr->mld_icmp6_hdr.icmp6_type == MLD_LISTENER_REPORT ||
+                       hdr->mld_icmp6_hdr.icmp6_type == MLD_LISTENER_REDUCTION) {
+               if (len != 24 || !mld_is_valid_group(&hdr->mld_addr))
+                       return;
+
+               groups_update_state(&q->groups, &hdr->mld_addr, NULL, 0,
+                               (hdr->mld_icmp6_hdr.icmp6_type == MLD_LISTENER_REPORT) ? UPDATE_REPORT : UPDATE_DONE);
+       }
+       uloop_timeout_set(&q->timeout, 0);
+}
+
+
+// Send generic / group-specific / group-and-source-specific queries
+ssize_t mld_send_query(struct querier_iface *q, const struct in6_addr *group,
+               const struct list_head *sources, bool suppress)
+{
+       uint16_t mrc = querier_mrc((group) ? q->groups.cfg_v6.last_listener_query_interval :
+                       q->groups.cfg_v6.query_response_interval);
+       struct {
+               struct mld_query q;
+               struct in6_addr addrs[QUERIER_MAX_SOURCE];
+       } query = {.q = {
+               .mld = {.mld_icmp6_hdr = {MLD_LISTENER_QUERY, 0, 0, {.icmp6_un_data16 = {mrc, 0}}}},
+               .s_qrv = (q->groups.cfg_v6.robustness & 0x7) | (suppress ? QUERIER_SUPPRESS : 0),
+               .qqic = querier_qqic(q->groups.cfg_v6.query_interval / 1000),
+       }};
+
+       struct group_source *s;
+       size_t cnt = 0;
+       if (sources) {
+               list_for_each_entry(s, sources, head) {
+                       if (cnt >= QUERIER_MAX_SOURCE)
+                               break; // TODO: log source overflow
+
+                       query.addrs[cnt++] = s->addr;
+               }
+       }
+       query.q.nsrc = htons(cnt);
+
+       struct sockaddr_in6 dest = {AF_INET6, 0, 0, IPV6_ALL_NODES_INIT, q->ifindex};
+
+       if (group)
+               query.q.mld.mld_addr = dest.sin6_addr = *group;
+
+       return mrib_send_mld(&q->mrib, &query.q.mld,
+                       sizeof(query.q) + cnt * sizeof(query.addrs[0]), &dest);
+}
diff --git a/src/mrib.c b/src/mrib.c
new file mode 100644 (file)
index 0000000..476768b
--- /dev/null
@@ -0,0 +1,783 @@
+/*
+ * Author: Steven Barth <steven at midlink.org>
+ *
+ * Copyright 2015 Deutsche Telekom AG
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <errno.h>
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <sys/socket.h>
+#include <sys/ioctl.h>
+#include <unistd.h>
+#include <net/if.h>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <netinet/ip.h>
+#include <netinet/ip6.h>
+
+#include <linux/mroute.h>
+#include <linux/mroute6.h>
+
+#include <libubox/uloop.h>
+
+#include "omcproxy.h"
+#include "mrib.h"
+
+struct mrib_route {
+       struct list_head head;
+       struct in6_addr group;
+       struct in6_addr source;
+       omgp_time_t valid_until;
+};
+
+struct mrib_iface {
+       int ifindex;
+       struct list_head users;
+       struct list_head routes;
+       struct list_head queriers;
+       struct uloop_timeout timer;
+};
+
+static uint32_t ipv4_rtr_alert = cpu_to_be32(0x94040000);
+static struct {
+       struct ip6_hbh hdr;
+       struct ip6_opt_router rt;
+       uint8_t pad[2];
+} ipv6_rtr_alert = {
+       .hdr = {0, 0},
+       .rt = {IP6OPT_ROUTER_ALERT, 2, {0, IP6_ALERT_MLD}},
+       .pad = {0, 0}
+};
+
+static struct mrib_iface mifs[MAXMIFS] = {};
+static struct uloop_fd mrt_fd = { .fd = -1 };
+static struct uloop_fd mrt6_fd = { .fd = -1 };
+
+
+// Unmap IPv4 address from IPv6
+static inline void mrib_unmap(struct in_addr *addr4, const struct in6_addr *addr6)
+{
+       addr4->s_addr = addr6->s6_addr32[3];
+}
+
+// Add / delete multicast route
+static int mrib_set(const struct in6_addr *group, const struct in6_addr *source,
+               struct mrib_iface *iface, mrib_filter dest, bool del)
+{
+       int status = 0;
+       size_t mifid = iface - mifs;
+       if (IN6_IS_ADDR_V4MAPPED(group)) {
+               struct mfcctl ctl = { .mfcc_parent = mifid };
+               mrib_unmap(&ctl.mfcc_origin, source);
+               mrib_unmap(&ctl.mfcc_mcastgrp, group);
+
+               if(!del)
+                       for (size_t i = 0; i < MAXMIFS; ++i)
+                               if (dest & (1 << i))
+                                       ctl.mfcc_ttls[i] = 1;
+
+               if (setsockopt(mrt_fd.fd, IPPROTO_IP,
+                               (del) ? MRT_DEL_MFC : MRT_ADD_MFC,
+                               &ctl, sizeof(ctl)))
+                       status = -errno;
+       } else {
+               struct mf6cctl ctl = {
+                       .mf6cc_origin = {AF_INET6, 0, 0, *source, 0},
+                       .mf6cc_mcastgrp = {AF_INET6, 0, 0, *group, 0},
+                       .mf6cc_parent = mifid,
+               };
+
+               if(!del)
+                       for (size_t i = 0; i < MAXMIFS; ++i)
+                               if (dest & (1 << i))
+                                       IF_SET(i, &ctl.mf6cc_ifset);
+
+               if (setsockopt(mrt6_fd.fd, IPPROTO_IPV6,
+                               (del) ? MRT6_DEL_MFC : MRT6_ADD_MFC,
+                               &ctl, sizeof(ctl)))
+                       status = -errno;
+       }
+
+       char groupbuf[INET6_ADDRSTRLEN], sourcebuf[INET6_ADDRSTRLEN];
+       inet_ntop(AF_INET6, group, groupbuf, sizeof(groupbuf));
+       inet_ntop(AF_INET6, source, sourcebuf, sizeof(sourcebuf));
+       if(del) {
+               L_DEBUG("%s: deleting MFC-entry for %s from %s%%%d: %s",
+                               __FUNCTION__, groupbuf, sourcebuf, iface->ifindex, strerror(-status));
+       } else {
+               int ifbuf_len = 0;
+               char ifbuf[256] = {0};
+               for (size_t i = 0; i < MAXMIFS; ++i)
+                       if (dest & (1 << i))
+                               ifbuf_len += snprintf(&ifbuf[ifbuf_len], sizeof(ifbuf) - ifbuf_len, " %d", mifs[i].ifindex);
+
+
+               L_DEBUG("%s: setting MFC-entry for %s from %s%%%d to%s: %s",
+                               __FUNCTION__, groupbuf, sourcebuf, iface->ifindex, ifbuf, strerror(-status));
+       }
+
+       return status;
+}
+
+
+// We have no way of knowing when a source disappears, so we delete multicast routes from time to time
+static void mrib_clean(struct uloop_timeout *t)
+{
+       struct mrib_iface *iface = container_of(t, struct mrib_iface, timer);
+       omgp_time_t now = omgp_time();
+       uloop_timeout_cancel(t);
+
+       struct mrib_route *c, *n;
+       list_for_each_entry_safe(c, n, &iface->routes, head) {
+               if (c->valid_until <= now || (list_empty(&iface->users) && list_empty(&iface->queriers))) {
+                       mrib_set(&c->group, &c->source, iface, 0, 1);
+                       list_del(&c->head);
+                       free(c);
+               } else {
+                       uloop_timeout_set(t, c->valid_until - now);
+                       break;
+               }
+       }
+}
+
+
+// Find MIFID by ifindex
+static size_t mrib_find(int ifindex)
+{
+       size_t i = 0;
+       while (i < MAXMIFS && mifs[i].ifindex != ifindex)
+               ++i;
+       return i;
+}
+
+// Notify all users of a new multicast source
+static void mrib_notify_newsource(struct mrib_iface *iface,
+               const struct in6_addr *group, const struct in6_addr *source)
+{
+       mrib_filter filter = 0;
+       struct mrib_user *user;
+       list_for_each_entry(user, &iface->users, head)
+               if (user->cb_newsource)
+                       user->cb_newsource(user, group, source, &filter);
+
+       char groupbuf[INET6_ADDRSTRLEN], sourcebuf[INET6_ADDRSTRLEN];
+       inet_ntop(AF_INET6, group, groupbuf, sizeof(groupbuf));
+       inet_ntop(AF_INET6, source, sourcebuf, sizeof(sourcebuf));
+       L_DEBUG("%s: detected new multicast source %s for %s on %d",
+                       __FUNCTION__, sourcebuf, groupbuf, iface->ifindex);
+
+       struct mrib_route *route = malloc(sizeof(*route));
+       if (route) {
+               route->group = *group;
+               route->source = *source;
+               route->valid_until = omgp_time() + MRIB_DEFAULT_LIFETIME * OMGP_TIME_PER_SECOND;
+
+               if (list_empty(&iface->routes))
+                       uloop_timeout_set(&iface->timer, MRIB_DEFAULT_LIFETIME * OMGP_TIME_PER_SECOND);
+
+               list_add_tail(&route->head, &iface->routes);
+               mrib_set(group, source, iface, filter, 0);
+       }
+}
+
+// Calculate IGMP-checksum
+static uint16_t igmp_checksum(const uint16_t *buf, size_t len)
+{
+       int32_t sum = 0;
+
+       while (len > 1) {
+               sum += *buf++;
+               sum = (sum + (sum >> 16)) & 0xffff;
+               len -= 2;
+       }
+
+       if (len == 1) {
+               sum += *((uint8_t*)buf);
+               sum += (sum + (sum >> 16)) & 0xffff;
+       }
+
+       return ~sum;
+}
+
+// Receive and handle MRT event
+static void mrib_receive_mrt(struct uloop_fd *fd, __unused unsigned flags)
+{
+       uint8_t buf[9216], cbuf[CMSG_SPACE(sizeof(struct in_pktinfo))];
+       char addrbuf[INET_ADDRSTRLEN];
+       struct sockaddr_in from;
+
+       while (true) {
+               struct iovec iov = {buf, sizeof(buf)};
+               struct msghdr hdr = {
+                               .msg_name = (void*)&from,
+                               .msg_namelen = sizeof(from),
+                               .msg_iov = &iov,
+                               .msg_iovlen = 1,
+                               .msg_control = cbuf,
+                               .msg_controllen = sizeof(cbuf)
+               };
+
+               ssize_t len = recvmsg(fd->fd, &hdr, MSG_DONTWAIT);
+               if (len < 0 && errno == EAGAIN)
+                       break;
+
+               struct iphdr *iph = iov.iov_base;
+               if (len < (ssize_t)sizeof(*iph))
+                       continue;
+
+               if (iph->protocol == 0) {
+                       // Pseudo IP/IGMP-packet from kernel MC-API
+                       struct igmpmsg *msg = iov.iov_base;
+                       struct mrib_iface *iface = NULL;
+                       if (msg->im_vif < MAXMIFS)
+                               iface = &mifs[msg->im_vif];
+
+                       if (!iface) {
+                               L_WARN("MRT kernel-message for unknown MIF %i", msg->im_vif);
+                               continue;
+                       }
+
+                       if (msg->im_msgtype != IGMPMSG_NOCACHE) {
+                               L_WARN("Unknown MRT kernel-message %i on interface %d",
+                                               msg->im_msgtype, iface->ifindex);
+                               continue;
+                       }
+
+                       struct in6_addr dst = IN6ADDR_ANY_INIT;
+                       struct in6_addr src = IN6ADDR_ANY_INIT;
+                       dst.s6_addr32[2] = cpu_to_be32(0xffff);
+                       dst.s6_addr32[3] = msg->im_dst.s_addr;
+                       src.s6_addr32[2] = cpu_to_be32(0xffff);
+                       src.s6_addr32[3] = msg->im_src.s_addr;
+
+                       mrib_notify_newsource(iface, &dst, &src);
+               } else {
+                       // IGMP packet
+                       if ((len -= iph->ihl * 4) < 0)
+                               continue;
+
+                       int ifindex = 0;
+                       for (struct cmsghdr *ch = CMSG_FIRSTHDR(&hdr); ch != NULL; ch = CMSG_NXTHDR(&hdr, ch)) {
+                               if (ch->cmsg_level == IPPROTO_IP && ch->cmsg_type == IP_PKTINFO) {
+                                       struct in_pktinfo *info = (struct in_pktinfo*)CMSG_DATA(ch);
+                                       ifindex = info->ipi_ifindex;
+                               }
+                       }
+
+                       if (ifindex == 0)
+                               continue;
+
+                       inet_ntop(AF_INET, &from.sin_addr, addrbuf, sizeof(addrbuf));
+                       struct igmphdr *igmp = (struct igmphdr*)&buf[iph->ihl * 4];
+
+                       uint16_t checksum = igmp->csum;
+                       igmp->csum = 0;
+
+                       if (iph->ttl != 1 || len < (ssize_t)sizeof(*igmp) ||
+                                       checksum != igmp_checksum((uint16_t*)igmp, len)) {
+                               L_WARN("%s: ignoring invalid IGMP-message of type %x from %s on %d",
+                                               __FUNCTION__, igmp->type, addrbuf, ifindex);
+                               continue;
+                       }
+
+                       uint32_t *opts = (uint32_t*)&iph[1];
+                       bool alert = (void*)&opts[1] <= (void*)igmp && *opts == ipv4_rtr_alert;
+                       if (!alert && (igmp->type != IGMP_HOST_MEMBERSHIP_QUERY ||
+                                                       (size_t)len > sizeof(*igmp) || igmp->code > 0)) {
+                               L_WARN("%s: ignoring invalid IGMP-message of type %x from %s on %d",
+                                               __FUNCTION__, igmp->type, addrbuf, ifindex);
+                               continue;
+                       }
+
+                       ssize_t mifid = mrib_find(ifindex);
+                       if (mifid < MAXMIFS) {
+                               struct mrib_querier *q;
+                               list_for_each_entry(q, &mifs[mifid].queriers, head)
+                                       if (q->cb_igmp)
+                                               q->cb_igmp(q, igmp, len, &from);
+                       }
+               }
+       }
+}
+
+// Receive and handle MRT6 event
+static void mrib_receive_mrt6(struct uloop_fd *fd, __unused unsigned flags)
+{
+       uint8_t buf[9216], cbuf[128];
+       char addrbuf[INET6_ADDRSTRLEN];
+       struct sockaddr_in6 from;
+
+       while (true) {
+               struct iovec iov = {buf, sizeof(buf)};
+               struct msghdr hdr = {
+                               .msg_name = (void*)&from,
+                               .msg_namelen = sizeof(from),
+                               .msg_iov = &iov,
+                               .msg_iovlen = 1,
+                               .msg_control = cbuf,
+                               .msg_controllen = sizeof(cbuf)
+               };
+
+               ssize_t len = recvmsg(fd->fd, &hdr, MSG_DONTWAIT);
+               if (len < 0 && errno == EAGAIN)
+                       break;
+
+               struct mld_hdr *mld = iov.iov_base;
+               if (len < (ssize_t)sizeof(*mld))
+                       continue;
+
+               if (mld->mld_icmp6_hdr.icmp6_type == 0) {
+                       // Pseudo ICMPv6/MLD-packet from kernel MC-API
+                       struct mrt6msg *msg = iov.iov_base;
+                       struct mrib_iface *iface = NULL;
+                       if (msg->im6_mif < MAXMIFS)
+                               iface = &mifs[msg->im6_mif];
+
+                       if (!iface) {
+                               L_WARN("MRT6 kernel-message for unknown MIF %i", msg->im6_mif);
+                               continue;
+                       }
+
+                       if (msg->im6_msgtype != MRT6MSG_NOCACHE) {
+                               L_WARN("Unknown MRT6 kernel-message %i on interface %d",
+                                               msg->im6_msgtype, iface->ifindex);
+                               continue;
+                       }
+
+                       mrib_notify_newsource(iface, &msg->im6_dst, &msg->im6_src);
+               } else {
+                       int hlim = 0, ifindex = from.sin6_scope_id;
+                       bool alert = false;
+                       for (struct cmsghdr *ch = CMSG_FIRSTHDR(&hdr); ch != NULL; ch = CMSG_NXTHDR(&hdr, ch)) {
+                               if (ch->cmsg_level == IPPROTO_IPV6 && ch->cmsg_type == IPV6_HOPLIMIT)
+                                       memcpy(&hlim, CMSG_DATA(ch), sizeof(hlim));
+                               else if (ch->cmsg_level == IPPROTO_IPV6 && ch->cmsg_type == IPV6_HOPOPTS &&
+                                               ch->cmsg_len >= CMSG_LEN(sizeof(ipv6_rtr_alert)) &&
+                                               memmem(CMSG_DATA(ch), ch->cmsg_len - CMSG_LEN(0),
+                                                               &ipv6_rtr_alert.rt, sizeof(ipv6_rtr_alert.rt)))
+                                       alert = true; // FIXME: memmem is wrong
+                       }
+                       inet_ntop(AF_INET6, &from.sin6_addr, addrbuf, sizeof(addrbuf));
+
+                       if (!IN6_IS_ADDR_LINKLOCAL(&from.sin6_addr) || hlim != 1 || len < 24 || !alert) {
+                               L_WARN("mld: ignoring invalid MLD-message of type %d from %s on %d",
+                                               mld->mld_icmp6_hdr.icmp6_type, addrbuf, ifindex);
+                               continue;
+                       }
+
+                       ssize_t mifid = mrib_find(from.sin6_scope_id);
+                       if (mifid < MAXMIFS) {
+                               struct mrib_querier *q;
+                               list_for_each_entry(q, &mifs[mifid].queriers, head)
+                                       if (q->cb_mld)
+                                               q->cb_mld(q, mld, len, &from);
+                       }
+               }
+       }
+}
+
+// Send an IGMP-packet
+int mrib_send_igmp(struct mrib_querier *q, struct igmpv3_query *igmp, size_t len,
+               const struct sockaddr_in *dest)
+{
+       uint8_t cbuf[CMSG_SPACE(sizeof(struct in_pktinfo))] = {0};
+       struct iovec iov = {igmp, len};
+       struct msghdr msg = {
+                       .msg_name = (void*)dest,
+                       .msg_namelen = sizeof(*dest),
+                       .msg_iov = &iov,
+                       .msg_iovlen = 1,
+                       .msg_control = cbuf,
+                       .msg_controllen = sizeof(cbuf)
+       };
+
+       igmp->csum = 0;
+       igmp->csum = igmp_checksum((uint16_t*)igmp, len);
+
+       // Set control data (define destination interface)
+       struct cmsghdr *chdr = CMSG_FIRSTHDR(&msg);
+       chdr->cmsg_level = IPPROTO_IP;
+       chdr->cmsg_type = IP_PKTINFO;
+       chdr->cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo));
+
+       struct in_pktinfo *pktinfo = (struct in_pktinfo*)CMSG_DATA(chdr);
+       pktinfo->ipi_addr.s_addr = 0;
+       pktinfo->ipi_ifindex = q->iface->ifindex;
+       if (mrib_igmp_source(q, &pktinfo->ipi_spec_dst))
+               return -errno;
+
+       ssize_t s = sendmsg(mrt_fd.fd, &msg, MSG_DONTWAIT);
+       return (s < 0) ? -errno : (s < (ssize_t)len) ? -EMSGSIZE : 0;
+}
+
+// Send an IGMP-packet
+int mrib_send_mld(struct mrib_querier *q, struct mld_hdr *mld, size_t len,
+               const struct sockaddr_in6 *dest)
+{
+       uint8_t cbuf[CMSG_SPACE(sizeof(struct in6_pktinfo))] = {0};
+       struct iovec iov = {mld, len};
+       struct msghdr msg = {
+                       .msg_name = (void*)dest,
+                       .msg_namelen = sizeof(*dest),
+                       .msg_iov = &iov,
+                       .msg_iovlen = 1,
+                       .msg_control = cbuf,
+                       .msg_controllen = sizeof(cbuf)
+       };
+
+       // Set control data (define destination interface)
+       struct cmsghdr *chdr = CMSG_FIRSTHDR(&msg);
+       chdr->cmsg_level = IPPROTO_IPV6;
+       chdr->cmsg_type = IPV6_PKTINFO;
+       chdr->cmsg_len = CMSG_LEN(sizeof(struct in6_pktinfo));
+
+       struct in6_pktinfo *pktinfo = (struct in6_pktinfo*)CMSG_DATA(chdr);
+       pktinfo->ipi6_ifindex = q->iface->ifindex;
+       if (mrib_mld_source(q, &pktinfo->ipi6_addr))
+               return -errno;
+
+       ssize_t s = sendmsg(mrt6_fd.fd, &msg, MSG_DONTWAIT);
+       return (s < 0) ? -errno : (s < (ssize_t)len) ? -EMSGSIZE : 0;
+}
+
+// Initialize MRIB
+static int mrib_init(void)
+{
+       int fd;
+       int val;
+
+       if ((fd = socket(AF_INET, SOCK_RAW, IPPROTO_IGMP)) < 0)
+               goto err;
+
+       val = 1;
+       if (setsockopt(fd, IPPROTO_IP, MRT_INIT, &val, sizeof(val)))
+               goto err;
+
+       if (setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &val, sizeof(val)))
+               goto err;
+
+       // Configure IP header fields
+       if (setsockopt(fd, IPPROTO_IP, IP_MULTICAST_TTL, &val, sizeof(val)))
+               goto err;
+
+       val = 0xc0;
+       if (setsockopt(fd, IPPROTO_IP, IP_TOS, &val, sizeof(val)))
+               goto err;
+
+       val = 0;
+       if (setsockopt(fd, IPPROTO_IP, IP_MULTICAST_LOOP, &val, sizeof(val)))
+               goto err;
+
+       // Set router-alert option
+       if (setsockopt(fd, IPPROTO_IP, IP_OPTIONS, &ipv4_rtr_alert, sizeof(ipv4_rtr_alert)))
+               goto err;
+
+       mrt_fd.fd = fd;
+
+
+       if ((fd = socket(AF_INET6, SOCK_RAW, IPPROTO_ICMPV6)) < 0)
+               goto err;
+
+       // We need to know the source interface and hop-opts
+       val = 1;
+       if (setsockopt(fd, IPPROTO_IPV6, MRT6_INIT, &val, sizeof(val)))
+               goto err;
+
+       if (setsockopt(fd, IPPROTO_IPV6, IPV6_RECVHOPOPTS, &val, sizeof(val)))
+               goto err;
+
+       if (setsockopt(fd, IPPROTO_IPV6, IPV6_RECVHOPLIMIT, &val, sizeof(val)))
+               goto err;
+
+       // MLD has hoplimit 1
+       if (setsockopt(fd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &val, sizeof(val)))
+               goto err;
+
+       val = 0;
+       if (setsockopt(fd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, &val, sizeof(val)))
+               goto err;
+
+       // Let the kernel compute our checksums
+       val = 2;
+       if (setsockopt(fd, IPPROTO_RAW, IPV6_CHECKSUM, &val, sizeof(val)))
+               goto err;
+
+       // Set hop-by-hop router alert on outgoing
+       if (setsockopt(fd, IPPROTO_IPV6, IPV6_HOPOPTS, &ipv6_rtr_alert, sizeof(ipv6_rtr_alert)))
+               goto err;
+
+       // Set ICMP6 filter
+       struct icmp6_filter flt;
+       ICMP6_FILTER_SETBLOCKALL(&flt);
+       ICMP6_FILTER_SETPASS(ICMPV6_MGM_QUERY, &flt);
+       ICMP6_FILTER_SETPASS(ICMPV6_MGM_REPORT, &flt);
+       ICMP6_FILTER_SETPASS(ICMPV6_MGM_REDUCTION, &flt);
+       ICMP6_FILTER_SETPASS(ICMPV6_MLD2_REPORT, &flt);
+       if (setsockopt(fd, IPPROTO_ICMPV6, ICMP6_FILTER, &flt, sizeof(flt)))
+               goto err;
+
+       mrt6_fd.fd = fd;
+
+       mrt_fd.cb = mrib_receive_mrt;
+       mrt6_fd.cb = mrib_receive_mrt6;
+
+       uloop_fd_add(&mrt_fd, ULOOP_READ | ULOOP_EDGE_TRIGGER);
+       uloop_fd_add(&mrt6_fd, ULOOP_READ | ULOOP_EDGE_TRIGGER);
+
+       fd = -1;
+       errno = 0;
+
+err:
+       if (fd >= 0)
+               close(fd);
+       return -errno;
+}
+
+// Create new interface entry
+static struct mrib_iface* mrib_get_iface(int ifindex)
+{
+       if (mrt_fd.fd < 0 && mrib_init() < 0)
+               return NULL;
+
+       size_t mifid = mrib_find(ifindex);
+       if (mifid < MAXMIFS)
+               return &mifs[mifid];
+
+       errno = EBUSY;
+       if ((mifid = mrib_find(0)) >= MAXMIFS)
+               return NULL;
+
+       struct mrib_iface *iface = &mifs[mifid];
+
+       struct vifctl ctl = {mifid, VIFF_USE_IFINDEX, 1, 0, { .vifc_lcl_ifindex = ifindex }, {INADDR_ANY}};
+       if (setsockopt(mrt_fd.fd, IPPROTO_IP, MRT_ADD_VIF, &ctl, sizeof(ctl)))
+               return NULL;
+
+       struct mif6ctl ctl6 = {mifid, 0, 1, ifindex, 0};
+       if (setsockopt(mrt6_fd.fd, IPPROTO_IPV6, MRT6_ADD_MIF, &ctl6, sizeof(ctl6)))
+               return NULL;
+
+       struct ip_mreqn mreq = {{INADDR_ALLIGMPV3RTRS_GROUP}, {INADDR_ANY}, ifindex};
+       setsockopt(mrt_fd.fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
+
+       mreq.imr_multiaddr.s_addr = cpu_to_be32(INADDR_ALLRTRS_GROUP);
+       setsockopt(mrt_fd.fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
+
+       struct ipv6_mreq mreq6 = {MLD2_ALL_MCR_INIT, ifindex};
+       setsockopt(mrt6_fd.fd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mreq6, sizeof(mreq6));
+
+       mreq6.ipv6mr_multiaddr.s6_addr[15] = 0x02;
+       setsockopt(mrt6_fd.fd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mreq6, sizeof(mreq6));
+
+       iface->timer.cb = mrib_clean;
+       iface->ifindex = ifindex;
+       INIT_LIST_HEAD(&iface->routes);
+       INIT_LIST_HEAD(&iface->users);
+       INIT_LIST_HEAD(&iface->queriers);
+       return iface;
+}
+
+// Remove interfaces if it has no more users
+static void mrib_clean_iface(struct mrib_iface *iface)
+{
+       if (list_empty(&iface->users) && list_empty(&iface->queriers)) {
+               iface->ifindex = 0;
+               mrib_clean(&iface->timer);
+
+               size_t mifid = iface - mifs;
+               struct vifctl ctl = {mifid, VIFF_USE_IFINDEX, 1, 0,
+                               { .vifc_lcl_ifindex = iface->ifindex }, {INADDR_ANY}};
+               setsockopt(mrt_fd.fd, IPPROTO_IP, MRT_DEL_VIF, &ctl, sizeof(ctl));
+
+               struct mif6ctl ctl6 = {mifid, 0, 1, iface->ifindex, 0};
+               setsockopt(mrt6_fd.fd, IPPROTO_IPV6, MRT6_DEL_MIF, &ctl6, sizeof(ctl6));
+
+               struct ip_mreqn mreq = {{INADDR_ALLIGMPV3RTRS_GROUP}, {INADDR_ANY}, iface->ifindex};
+               setsockopt(mrt_fd.fd, IPPROTO_IP, IP_DROP_MEMBERSHIP, &mreq, sizeof(mreq));
+
+               mreq.imr_multiaddr.s_addr = cpu_to_be32(INADDR_ALLRTRS_GROUP);
+               setsockopt(mrt_fd.fd, IPPROTO_IP, IP_DROP_MEMBERSHIP, &mreq, sizeof(mreq));
+
+               struct ipv6_mreq mreq6 = {MLD2_ALL_MCR_INIT, iface->ifindex};
+               setsockopt(mrt6_fd.fd, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, &mreq6, sizeof(mreq6));
+
+               mreq6.ipv6mr_multiaddr.s6_addr[15] = 0x02;
+               setsockopt(mrt6_fd.fd, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, &mreq6, sizeof(mreq6));
+       }
+}
+
+// Register a new interface to mrib
+int mrib_attach_user(struct mrib_user *user, int ifindex, mrib_cb *cb_newsource)
+{
+       struct mrib_iface *iface = mrib_get_iface(ifindex);
+       if (!iface)
+               return -errno;
+
+       if (user->iface == iface)
+               return -EALREADY;
+
+       list_add(&user->head, &iface->users);
+       user->iface = iface;
+       user->cb_newsource = cb_newsource;
+       return 0;
+}
+
+// Deregister an interface from mrib
+void mrib_detach_user(struct mrib_user *user)
+{
+       struct mrib_iface *iface = user->iface;
+       if (!iface)
+               return;
+
+       user->iface = NULL;
+       list_del(&user->head);
+       mrib_clean_iface(iface);
+}
+
+// Register a querier to mrib
+int mrib_attach_querier(struct mrib_querier *querier, int ifindex, mrib_igmp_cb *cb_igmp, mrib_mld_cb *cb_mld)
+{
+       struct mrib_iface *iface = mrib_get_iface(ifindex);
+       if (!iface)
+               return -errno;
+
+       list_add(&querier->head, &iface->queriers);
+       querier->iface = iface;
+       querier->cb_igmp = cb_igmp;
+       querier->cb_mld = cb_mld;
+       return 0;
+}
+
+// Deregister a querier from mrib
+void mrib_detach_querier(struct mrib_querier *querier)
+{
+       struct mrib_iface *iface = querier->iface;
+       if (!iface)
+               return;
+
+       querier->iface = NULL;
+       list_del(&querier->head);
+       mrib_clean_iface(iface);
+}
+
+static uint8_t prefix_contains(const struct in6_addr *p, uint8_t plen, const struct in6_addr *addr)
+{
+       int blen = plen >> 3;
+       if(blen && memcmp(p, addr, blen))
+               return 0;
+
+       int rem = plen & 0x07;
+       if(rem && ((p->s6_addr[blen] ^ addr->s6_addr[blen]) >> (8 - rem)))
+               return 0;
+
+       return 1;
+}
+
+// Flush state for a multicast route
+int mrib_flush(struct mrib_user *user, const struct in6_addr *group, uint8_t group_plen, const struct in6_addr *source)
+{
+       struct mrib_iface *iface = user->iface;
+       if (!iface)
+               return -ENOENT;
+
+       bool found = false;
+       struct mrib_route *route, *n;
+       list_for_each_entry_safe(route, n, &iface->routes, head) {
+               if (prefix_contains(group, group_plen, &route->group) &&
+                               (!source || IN6_ARE_ADDR_EQUAL(&route->source, source))) {
+                       route->valid_until = 0;
+                       list_del(&route->head);
+                       list_add(&route->head, &iface->routes);
+                       found = true;
+               }
+       }
+
+       if (found)
+               mrib_clean(&iface->timer);
+
+       return (found) ? 0 : -ENOENT;
+}
+
+// Add an interface to the filter
+int mrib_filter_add(mrib_filter *filter, struct mrib_user *user)
+{
+       struct mrib_iface *iface = user->iface;
+       if (!iface)
+               return -ENOENT;
+
+       *filter |= 1 << (iface - mifs);
+       return 0;
+}
+
+// Get MLD source address
+int mrib_mld_source(struct mrib_querier *q, struct in6_addr *source)
+{
+       struct sockaddr_in6 addr = {AF_INET6, 0, 0, MLD2_ALL_MCR_INIT, q->iface->ifindex};
+       socklen_t alen = sizeof(addr);
+       int sock = socket(AF_INET6, SOCK_RAW | SOCK_CLOEXEC, IPPROTO_ICMPV6);
+       int ret = 0;
+
+       if (sock < 0 || connect(sock, (struct sockaddr*)&addr, sizeof(addr)))
+               ret = -errno;
+
+       if (ret || getsockname(sock, (struct sockaddr*)&addr, &alen)) {
+               L_WARN("%s: failed to detect local source address on %d", __FUNCTION__, q->iface->ifindex);
+               ret = -errno;
+       }
+
+       close(sock);
+
+       if (ret == 0)
+               *source = addr.sin6_addr;
+
+       return ret;
+}
+
+
+// Get IGMP source address
+int mrib_igmp_source(struct mrib_querier *q, struct in_addr *source)
+{
+       struct sockaddr_in addr = {AF_INET, 0, {cpu_to_be32(INADDR_ALLHOSTS_GROUP)}, {0}};
+       socklen_t alen = sizeof(addr);
+       struct ifreq ifr = {.ifr_name = ""};
+       int sock = socket(AF_INET, SOCK_RAW | SOCK_CLOEXEC, IPPROTO_IGMP);
+       int ret = 0;
+
+       ifr.ifr_ifindex = q->iface->ifindex;
+
+       if (sock < 0 || ioctl(sock, SIOCGIFNAME, &ifr) ||
+                       setsockopt(sock, SOL_SOCKET, SO_BINDTODEVICE, ifr.ifr_name, strlen(ifr.ifr_name)))
+               ret = -errno;
+
+
+       if (ret || connect(sock, (struct sockaddr*)&addr, sizeof(addr)))
+               ret = -errno;
+
+       if (ret || getsockname(sock, (struct sockaddr*)&addr, &alen)) {
+               L_WARN("%s: failed to detect local source address on %d", __FUNCTION__, q->iface->ifindex);
+               ret = -errno;
+       }
+
+       close(sock);
+
+       if (ret == 0)
+               *source = addr.sin_addr;
+
+       return ret;
+}
diff --git a/src/mrib.h b/src/mrib.h
new file mode 100644 (file)
index 0000000..b3dd72b
--- /dev/null
@@ -0,0 +1,93 @@
+/*
+ * Author: Steven Barth <steven at midlink.org>
+ *
+ * Copyright 2015 Deutsche Telekom AG
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+
+#include <netinet/in.h>
+#include <libubox/list.h>
+#include <sys/socket.h>
+
+#include <netinet/icmp6.h>
+
+#define icmp6_filter icmpv6_filter
+#include <linux/igmp.h>
+#include <linux/icmpv6.h>
+#undef icmp6_filter
+
+#define MRIB_DEFAULT_LIFETIME 125
+
+#define IPV6_ALL_NODES_INIT            { { { 0xff,0x02,0,0,0,0,0,0,0,0,0,0,0,0,0,0x1 } } }
+#define INADDR_ALLIGMPV3RTRS_GROUP     cpu_to_be32(0xe0000016U)
+
+typedef uint32_t mrib_filter;
+struct mrib_iface;
+struct mrib_user;
+struct mrib_querier;
+
+typedef void(mrib_cb)(struct mrib_user *user, const struct in6_addr *group,
+               const struct in6_addr *source, mrib_filter *filter);
+
+typedef void(mrib_igmp_cb)(struct mrib_querier *mrib, const struct igmphdr *igmp, size_t len,
+               const struct sockaddr_in *from);
+
+typedef void(mrib_mld_cb)(struct mrib_querier *mrib, const struct mld_hdr *mld, size_t len,
+               const struct sockaddr_in6 *from);
+
+struct mrib_user {
+       struct list_head head;
+       struct mrib_iface *iface;
+       mrib_cb *cb_newsource;
+};
+
+struct mrib_querier {
+       struct list_head head;
+       struct mrib_iface *iface;
+       mrib_igmp_cb *cb_igmp;
+       mrib_mld_cb *cb_mld;
+};
+
+// Register a new user to mrib
+int mrib_attach_user(struct mrib_user *user, int ifindex, mrib_cb *cb_newsource);
+
+// Deregister a user from mrib
+void mrib_detach_user(struct mrib_user *user);
+
+// Register a querier to mrib
+int mrib_attach_querier(struct mrib_querier *querier, int ifindex, mrib_igmp_cb *cb_igmp, mrib_mld_cb *cb_mld);
+
+// Deregister a querier from mrib
+void mrib_detach_querier(struct mrib_querier *querier);
+
+// Flush state for a multicast route
+int mrib_flush(struct mrib_user *user, const struct in6_addr *group, uint8_t group_plen, const struct in6_addr *source);
+
+// Add interface to filter
+int mrib_filter_add(mrib_filter *filter, struct mrib_user *user);
+
+// Send IGMP-packet
+int mrib_send_igmp(struct mrib_querier *querier, struct igmpv3_query *igmp, size_t len,
+               const struct sockaddr_in *dest);
+
+// Send MLD-packet
+int mrib_send_mld(struct mrib_querier *querier, struct mld_hdr *mld, size_t len,
+               const struct sockaddr_in6 *dest);
+
+// Get source address
+int mrib_mld_source(struct mrib_querier *q, struct in6_addr *source);
+int mrib_igmp_source(struct mrib_querier *q, struct in_addr *source);
diff --git a/src/omcproxy.c b/src/omcproxy.c
new file mode 100644 (file)
index 0000000..2869a9e
--- /dev/null
@@ -0,0 +1,197 @@
+/*
+ * Copyright 2015 Steven Barth <steven at midlink.org>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <stdio.h>
+#include <signal.h>
+#include <string.h>
+#include <stdlib.h>
+#include <netdb.h>
+#include <net/if.h>
+#include <unistd.h>
+
+#include <libubox/uloop.h>
+#include <libubox/blobmsg.h>
+
+#include "omcproxy.h"
+#include "proxy.h"
+
+int log_level = LOG_WARNING;
+
+
+enum {
+       PROXY_ATTR_SOURCE,
+       PROXY_ATTR_SCOPE,
+       PROXY_ATTR_DEST,
+       PROXY_ATTR_MAX,
+};
+
+static struct blobmsg_policy proxy_policy[PROXY_ATTR_MAX] = {
+       [PROXY_ATTR_SOURCE] = { .name = "source", .type = BLOBMSG_TYPE_STRING },
+       [PROXY_ATTR_SCOPE] = { .name = "scope", .type = BLOBMSG_TYPE_STRING },
+       [PROXY_ATTR_DEST] = { .name = "dest", .type = BLOBMSG_TYPE_ARRAY },
+};
+
+static int handle_proxy_set(void *data, size_t len)
+{
+       struct blob_attr *tb[PROXY_ATTR_MAX], *c;
+       blobmsg_parse(proxy_policy, PROXY_ATTR_MAX, tb, data, len);
+
+       const char *name = ((c = tb[PROXY_ATTR_SOURCE])) ? blobmsg_get_string(c) : NULL;
+       int uplink = 0;
+       int downlinks[32] = {0};
+       size_t downlinks_cnt = 0;
+       enum proxy_flags flags = 0;
+
+       if (!name)
+               return -EINVAL;
+
+       if (!(uplink = if_nametoindex(name))) {
+               L_WARN("%s(%s): %s", __FUNCTION__, name, strerror(errno));
+               return -errno;
+       }
+
+       if ((c = tb[PROXY_ATTR_SCOPE])) {
+               const char *scope = blobmsg_get_string(c);
+               if (!strcmp(scope, "global"))
+                       flags = PROXY_GLOBAL;
+               else if (!strcmp(scope, "organization"))
+                       flags = PROXY_ORGLOCAL;
+               else if (!strcmp(scope, "site"))
+                       flags = PROXY_SITELOCAL;
+               else if (!strcmp(scope, "admin"))
+                       flags = PROXY_ADMINLOCAL;
+               else if (!strcmp(scope, "realm"))
+                       flags = PROXY_REALMLOCAL;
+
+               if (!flags) {
+                       L_WARN("%s(%s): invalid scope (%s)", __FUNCTION__, name, scope);
+                       return -EINVAL;
+               }
+       }
+
+       if ((c = tb[PROXY_ATTR_DEST])) {
+               struct blob_attr *d;
+               unsigned rem;
+               blobmsg_for_each_attr(d, c, rem) {
+                       if (downlinks_cnt >= 32) {
+                               L_WARN("%s(%s): maximum number of destinations exceeded", __FUNCTION__, name);
+                               return -EINVAL;
+                       }
+
+                       const char *n = blobmsg_type(d) == BLOBMSG_TYPE_STRING ? blobmsg_get_string(d) : "";
+                       if (!(downlinks[downlinks_cnt++] = if_nametoindex(n))) {
+                               L_WARN("%s(%s): %s (%s)", __FUNCTION__, name, strerror(errno), blobmsg_get_string(d));
+                               return -errno;
+                       }
+               }
+       }
+
+       return proxy_set(uplink, downlinks, downlinks_cnt, flags);
+}
+
+static void handle_signal(__unused int signal)
+{
+       uloop_end();
+}
+
+static void usage(const char *arg) {
+       fprintf(stderr, "Usage: %s [options] <proxy1> [<proxy2>] [...]\n"
+                       "\nProxy examples:\n"
+                       "eth1,eth2\n"
+                       "eth1,eth2,eth3,scope=organization\n"
+                       "\nProxy options (each option may only occur once):\n"
+                       "       <interface>                     interfaces to proxy (first is uplink)\n"
+                       "       scope=<scope>                   minimum multicast scope to proxy\n"
+                       "               [global,organization,site,admin,realm] (default: global)\n"
+                       "\nOptions:\n"
+                       "       -v                              verbose logging\n"
+                       "       -h                              show this help\n",
+       arg);
+}
+
+int main(int argc, char **argv) {
+       signal(SIGINT, handle_signal);
+       signal(SIGTERM, handle_signal);
+       signal(SIGHUP, SIG_IGN);
+       signal(SIGPIPE, SIG_IGN);
+       openlog("omcproxy", LOG_PERROR, LOG_DAEMON);
+
+       if (getuid()) {
+               L_ERR("must be run as root!");
+               return 2;
+       }
+
+       uloop_init();
+       bool start = true;
+
+       for (ssize_t i = 1; i < argc; ++i) {
+               const char *source = NULL;
+               const char *scope = NULL;
+               struct blob_buf b = {NULL, NULL, 0, NULL};
+
+               if (!strcmp(argv[i], "-h")) {
+                       usage(argv[0]);
+                       return 1;
+               } else if (!strncmp(argv[i], "-v", 2)) {
+                       if ((log_level = atoi(&argv[i][2])) <= 0)
+                               log_level = 7;
+                       continue;
+               }
+
+
+               blob_buf_init(&b, 0);
+
+               void *k = blobmsg_open_array(&b, "dest");
+               for (char *c = strtok(argv[i], ","); c; c = strtok(NULL, ",")) {
+                       if (!strncmp(c, "scope=", 6)) {
+                               scope = &c[6];
+                       } else if (!source) {
+                               source = c;
+                       } else {
+                               blobmsg_add_string(&b, NULL, c);
+                       }
+               }
+               blobmsg_close_array(&b, k);
+
+               if (source)
+                       blobmsg_add_string(&b, "source", source);
+
+               if (scope)
+                       blobmsg_add_string(&b, "scope", scope);
+
+               if (handle_proxy_set(blob_data(b.head), blob_len(b.head))) {
+                       fprintf(stderr, "failed to setup proxy: %s\n", argv[i]);
+                       start = false;
+               }
+
+               blob_buf_free(&b);
+       }
+
+       if (argc < 2) {
+               usage(argv[0]);
+               start = false;
+       }
+
+       if (start)
+               uloop_run();
+
+       proxy_update(true);
+       proxy_flush();
+
+       uloop_done();
+       return 0;
+}
diff --git a/src/omcproxy.h b/src/omcproxy.h
new file mode 100644 (file)
index 0000000..b3e1ce3
--- /dev/null
@@ -0,0 +1,122 @@
+/*
+ * Author: Steven Barth <steven at midlink.org>
+ *
+ * Copyright 2015 Deutsche Telekom AG
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef OMGPROXY_H_
+#define OMGPROXY_H_
+
+#define OMGPROXY_DEFAULT_L_LEVEL 7
+
+#ifndef L_LEVEL
+#define L_LEVEL OMGPROXY_DEFAULT_L_LEVEL
+#endif /* !L_LEVEL */
+
+#ifndef L_PREFIX
+#define L_PREFIX ""
+#endif /* !L_PREFIX */
+
+#ifdef __APPLE__
+
+#define __APPLE_USE_RFC_3542
+#define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
+#define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
+
+#include <sys/queue.h>
+#ifdef LIST_HEAD
+#undef LIST_HEAD
+#endif /* LIST_HEAD */
+
+#endif /* __APPLE__ */
+
+#include <stddef.h>
+#include <stdint.h>
+#include <time.h>
+#include <syslog.h>
+#include <sys/types.h>
+#include <libubox/utils.h>
+
+#define STR_EXPAND(tok) #tok
+#define STR(tok) STR_EXPAND(tok)
+
+typedef int64_t omgp_time_t;
+#define OMGP_TIME_MAX INT64_MAX
+#define OMGP_TIME_PER_SECOND INT64_C(1000)
+
+static inline omgp_time_t omgp_time(void) {
+       struct timespec ts;
+       clock_gettime(CLOCK_MONOTONIC, &ts);
+       return ((omgp_time_t)ts.tv_sec * OMGP_TIME_PER_SECOND) +
+                       ((omgp_time_t)ts.tv_nsec / (1000000000 / OMGP_TIME_PER_SECOND));
+}
+
+extern int log_level;
+
+// Logging macros
+
+#define L_INTERNAL(level, ...)                  \
+do {                                            \
+  if (log_level >= level)                       \
+    syslog(level, L_PREFIX __VA_ARGS__);        \
+ } while(0)
+
+#if L_LEVEL >= LOG_ERR
+#define L_ERR(...) L_INTERNAL(LOG_ERR, __VA_ARGS__)
+#else
+#define L_ERR(...) do {} while(0)
+#endif
+
+#if L_LEVEL >= LOG_WARNING
+#define L_WARN(...) L_INTERNAL(LOG_WARNING, __VA_ARGS__)
+#else
+#define L_WARN(...) do {} while(0)
+#endif
+
+#if L_LEVEL >= LOG_NOTICE
+#define L_NOTICE(...) L_INTERNAL(LOG_NOTICE, __VA_ARGS__)
+#else
+#define L_NOTICE(...) do {} while(0)
+#endif
+
+#if L_LEVEL >= LOG_INFO
+#define L_INFO(...) L_INTERNAL(LOG_INFO, __VA_ARGS__)
+#else
+#define L_INFO(...) do {} while(0)
+#endif
+
+#if L_LEVEL >= LOG_DEBUG
+#define L_DEBUG(...) L_INTERNAL(LOG_DEBUG, __VA_ARGS__)
+#else
+#define L_DEBUG(...) do {} while(0)
+#endif
+
+
+// Some C99 compatibility
+#ifndef typeof
+#define typeof __typeof
+#endif
+
+#ifndef container_of
+#define container_of(ptr, type, member) (           \
+    (type *)( (char *)ptr - offsetof(type,member) ))
+#endif
+
+#ifndef __unused
+#define __unused __attribute__((unused))
+#endif
+
+#endif /* PIMBD_H_ */
diff --git a/src/proxy.c b/src/proxy.c
new file mode 100644 (file)
index 0000000..67c91a7
--- /dev/null
@@ -0,0 +1,226 @@
+/*
+ * Copyright 2015 Steven Barth <steven at midlink.org>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <errno.h>
+#include <libubox/list.h>
+
+#include "querier.h"
+#include "client.h"
+#include "mrib.h"
+#include "proxy.h"
+
+struct proxy {
+       struct list_head head;
+       int ifindex;
+       struct mrib_user mrib;
+       struct querier querier;
+       enum proxy_flags flags;
+};
+
+struct proxy_downlink {
+       struct querier_user_iface iface;
+       struct mrib_user mrib;
+       struct client client;
+       enum proxy_flags flags;
+};
+
+static struct list_head proxies = LIST_HEAD_INIT(proxies);
+
+// Remove and cleanup a downlink
+static void proxy_remove_downlink(struct proxy_downlink *downlink)
+{
+       mrib_detach_user(&downlink->mrib);
+       querier_detach(&downlink->iface);
+       client_deinit(&downlink->client);
+       free(downlink);
+}
+
+// Match scope of a multicast-group against proxy scope-filter
+static bool proxy_match_scope(enum proxy_flags flags, const struct in6_addr *addr)
+{
+       unsigned scope = 0;
+       if (IN6_IS_ADDR_V4MAPPED(addr)) {
+               if (addr->s6_addr[12] == 239 && addr->s6_addr[13] == 255)
+                       scope = PROXY_REALMLOCAL;
+               else if (addr->s6_addr[12] == 239 && (addr->s6_addr[13] & 0xfc) == 192)
+                       scope = PROXY_ORGLOCAL;
+               else if (addr->s6_addr[12] == 224 && addr->s6_addr[13] == 0 && addr->s6_addr[14] == 0)
+                       scope = 2;
+               else
+                       scope = PROXY_GLOBAL;
+       } else {
+               scope = addr->s6_addr[1] & 0xf;
+       }
+       return scope >= (flags & _PROXY_SCOPEMASK);
+}
+
+// Test and set multicast route (called by mrib on detection of new source)
+static void proxy_mrib(struct mrib_user *mrib, const struct in6_addr *group,
+               const struct in6_addr *source, mrib_filter *filter)
+{
+       struct proxy *proxy = container_of(mrib, struct proxy, mrib);
+       if (!proxy_match_scope(proxy->flags, group))
+               return;
+
+       omgp_time_t now = omgp_time();
+       struct querier_user *user;
+       list_for_each_entry(user, &proxy->querier.ifaces, head) {
+               if (groups_includes_group(user->groups, group, source, now)) {
+                       struct querier_user_iface *iface = container_of(user, struct querier_user_iface, user);
+                       struct proxy_downlink *downlink = container_of(iface, struct proxy_downlink, iface);
+                       mrib_filter_add(filter, &downlink->mrib);
+               }
+       }
+}
+
+// Update proxy state (called from querier on change of combined group-state)
+static void proxy_trigger(struct querier_user_iface *user, const struct in6_addr *group,
+               bool include, const struct in6_addr *sources, size_t len)
+{
+       struct proxy_downlink *iface = container_of(user, struct proxy_downlink, iface);
+       if (proxy_match_scope(iface->flags, group))
+               client_set(&iface->client, group, include, sources, len);
+}
+
+// Remove proxy with given name
+static int proxy_unset(struct proxy *proxyp)
+{
+       bool found = false;
+       struct proxy *proxy, *n;
+       list_for_each_entry_safe(proxy, n, &proxies, head) {
+               if ((proxyp && proxy == proxyp) ||
+                               (!proxyp && (proxy->flags & _PROXY_UNUSED))) {
+                       mrib_detach_user(&proxy->mrib);
+
+                       struct querier_user *user, *n;
+                       list_for_each_entry_safe(user, n, &proxy->querier.ifaces, head) {
+                               struct querier_user_iface *i = container_of(user, struct querier_user_iface, user);
+                               proxy_remove_downlink(container_of(i, struct proxy_downlink, iface));
+                       }
+
+                       querier_deinit(&proxy->querier);
+                       list_del(&proxy->head);
+                       free(proxy);
+                       found = true;
+               }
+       }
+       return (found) ? 0 : -ENOENT;
+}
+
+// Add / update proxy
+int proxy_set(int uplink, const int downlinks[], size_t downlinks_cnt, enum proxy_flags flags)
+{
+       struct proxy *proxy = NULL, *p;
+       list_for_each_entry(p, &proxies, head)
+               if (proxy->ifindex == uplink)
+                       proxy = p;
+
+       if (proxy && (downlinks_cnt == 0 ||
+                       ((proxy->flags & _PROXY_SCOPEMASK) != (flags & _PROXY_SCOPEMASK)))) {
+               proxy_unset(proxy);
+               proxy = NULL;
+       }
+
+       if (downlinks_cnt <= 0)
+               return 0;
+
+       if (!proxy) {
+               if (!(proxy = calloc(1, sizeof(*proxy))))
+                       return -ENOMEM;
+
+               if ((flags & _PROXY_SCOPEMASK) == 0)
+                       flags |= PROXY_GLOBAL;
+
+               proxy->flags = flags;
+               proxy->ifindex = uplink;
+               querier_init(&proxy->querier);
+               list_add(&proxy->head, &proxies);
+               if (mrib_attach_user(&proxy->mrib, uplink, proxy_mrib))
+                       goto err;
+       }
+
+       struct querier_user *user, *n;
+       list_for_each_entry_safe(user, n, &proxy->querier.ifaces, head) {
+               struct querier_user_iface *iface = container_of(user, struct querier_user_iface, user);
+
+               size_t i;
+               for (i = 0; i < downlinks_cnt && downlinks[i] == iface->iface->ifindex; ++i);
+                       if (i == downlinks_cnt)
+                               proxy_remove_downlink(container_of(iface, struct proxy_downlink, iface));
+       }
+
+       for (size_t i = 0; i < downlinks_cnt; ++i) {
+               bool found = false;
+               struct querier_user *user;
+               list_for_each_entry(user, &proxy->querier.ifaces, head) {
+                       struct querier_user_iface *iface = container_of(user, struct querier_user_iface, user);
+                       if (iface->iface->ifindex == downlinks[i]) {
+                               found = true;
+                               break;
+                       }
+               }
+
+               if (found)
+                       continue;
+
+               struct proxy_downlink *downlink = calloc(1, sizeof(*downlink));
+               if (!downlink)
+                       goto err;
+
+               if (client_init(&downlink->client, uplink))
+                       goto downlink_err3;
+
+               if (mrib_attach_user(&downlink->mrib, downlinks[i], NULL))
+                       goto downlink_err2;
+
+               if (querier_attach(&downlink->iface, &proxy->querier, downlinks[i], proxy_trigger))
+                       goto downlink_err1;
+
+               downlink->flags = proxy->flags;
+               continue;
+
+downlink_err1:
+               mrib_detach_user(&downlink->mrib);
+downlink_err2:
+               client_deinit(&downlink->client);
+downlink_err3:
+               free(downlink);
+               goto err;
+       }
+
+       return 0;
+
+err:
+       proxy_unset(proxy);
+       return -errno;
+}
+
+// Mark all flushable proxies as unused
+void proxy_update(bool all)
+{
+       struct proxy *proxy;
+       list_for_each_entry(proxy, &proxies, head)
+               if (all || (proxy->flags & PROXY_FLUSHABLE))
+                       proxy->flags |= _PROXY_UNUSED;
+}
+
+
+// Flush all unused proxies
+void proxy_flush(void)
+{
+       proxy_unset(NULL);
+}
diff --git a/src/proxy.h b/src/proxy.h
new file mode 100644 (file)
index 0000000..a28082d
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2015 Steven Barth <steven at midlink.org>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+
+#include <stdint.h>
+#include <stdbool.h>
+#include <netinet/in.h>
+
+enum proxy_flags {
+       // minimum scope to proxy (use only one, includes higher scopes)
+       PROXY_REALMLOCAL = 3,
+       PROXY_ADMINLOCAL = 4,
+       PROXY_SITELOCAL = 5,
+       PROXY_ORGLOCAL = 8,
+       PROXY_GLOBAL = 0xe,
+
+       // proxy may be flushed (from static config source)
+       PROXY_FLUSHABLE = 1 << 4,
+
+       // internal values
+       _PROXY_UNUSED = 1 << 5,
+       _PROXY_SCOPEMASK = 0xf,
+};
+
+
+int proxy_set(int uplink, const int downlinks[], size_t downlinks_cnt, enum proxy_flags flags);
+
+
+void proxy_update(bool all);
+void proxy_flush(void);
diff --git a/src/querier.c b/src/querier.c
new file mode 100644 (file)
index 0000000..cf489f9
--- /dev/null
@@ -0,0 +1,276 @@
+/*
+ * Author: Steven Barth <steven at midlink.org>
+ *
+ * Copyright 2015 Deutsche Telekom AG
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <errno.h>
+#include <string.h>
+#include <stdlib.h>
+#include <netinet/in.h>
+#include <libubox/ustream.h>
+#include <libubox/usock.h>
+#include <libubox/list.h>
+
+#include "querier.h"
+
+static struct list_head ifaces = LIST_HEAD_INIT(ifaces);
+
+
+// Handle querier update event from a querier-interface
+static void querier_announce_iface(struct querier_user_iface *user, omgp_time_t now, const struct group *group, bool enabled)
+{
+       bool include = true;
+       size_t cnt = 0;
+       struct in6_addr sources[group->source_count];
+
+       if (enabled) {
+               struct group_source *source;
+               group_for_each_active_source(source, group, now)
+                       sources[cnt++] = source->addr;
+
+               include = group_is_included(group, now);
+       }
+
+       if (user->user_cb)
+               user->user_cb(user, &group->addr, include, sources, cnt);
+}
+
+// Handle changes from a querier for a given group (called by a group-state as callback)
+static void querier_announce_change(struct groups *groups, struct group *group, omgp_time_t now)
+{
+       struct querier_iface *iface = container_of(groups, struct querier_iface, groups);
+
+       // Only recognize changes to non-link-local groups
+       struct querier_user_iface *user;
+       list_for_each_entry(user, &iface->users, head)
+               querier_announce_iface(user, now, group, true);
+}
+
+// Send query for a group + sources (called by a group-state as callback)
+static void querier_send_query(struct groups *groups, const struct in6_addr *group,
+               const struct list_head *sources, bool suppress)
+{
+       struct querier_iface *iface = container_of(groups, struct querier_iface, groups);
+       char addrbuf[INET6_ADDRSTRLEN] = "::";
+       inet_ntop(AF_INET6, group, addrbuf, sizeof(addrbuf));
+
+       L_DEBUG("%s: sending %s-specific query for %s on %d (S: %d)", __FUNCTION__,
+                       (!sources) ? "group" : "source", addrbuf, iface->ifindex, suppress);
+
+       bool v4 = IN6_IS_ADDR_V4MAPPED(group);
+       if (v4 && !iface->igmp_other_querier)
+               igmp_send_query(iface, group, sources, suppress);
+       else if (!v4 && !iface->mld_other_querier)
+               mld_send_query(iface, group, sources, suppress);
+}
+
+// Expire interface timers and send queries (called by timer as callback)
+static void querier_iface_timer(struct uloop_timeout *timeout)
+{
+       struct querier_iface *iface = container_of(timeout, struct querier_iface, timeout);
+       omgp_time_t now = omgp_time();
+       omgp_time_t next_event = now + 3600 * OMGP_TIME_PER_SECOND;
+
+       if (iface->igmp_next_query <= now) {
+               // If the other querier is gone, reset interface config
+               if (iface->igmp_other_querier) {
+                       iface->groups.cfg_v4 = iface->cfg;
+                       iface->igmp_other_querier = false;
+               }
+
+               igmp_send_query(iface, NULL, NULL, false);
+               L_DEBUG("%s: sending generic IGMP-query on %d (S: 0)", __FUNCTION__, iface->ifindex);
+
+               if (iface->igmp_startup_tries > 0)
+                       --iface->igmp_startup_tries;
+
+               iface->igmp_next_query = now + ((iface->igmp_startup_tries > 0) ?
+                                               (iface->groups.cfg_v4.query_interval / 4) :
+                                               iface->groups.cfg_v4.query_interval);
+       }
+
+       if (iface->igmp_next_query < next_event)
+               next_event = iface->igmp_next_query;
+
+       if (iface->mld_next_query <= now) {
+               // If the other querier is gone, reset interface config
+               if (iface->mld_other_querier) {
+                       iface->groups.cfg_v6 = iface->cfg;
+                       iface->mld_other_querier = false;
+               }
+
+               mld_send_query(iface, NULL, NULL, false);
+               L_DEBUG("%s: sending generic MLD-query on %d (S: 0)", __FUNCTION__, iface->ifindex);
+
+               if (iface->mld_startup_tries > 0)
+                       --iface->mld_startup_tries;
+
+               iface->mld_next_query = now + ((iface->mld_startup_tries > 0) ?
+                                               (iface->groups.cfg_v6.query_interval / 4) :
+                                               iface->groups.cfg_v6.query_interval);
+       }
+
+       if (iface->mld_next_query < next_event)
+               next_event = iface->mld_next_query;
+
+       uloop_timeout_set(&iface->timeout, (next_event > now) ? next_event - now : 0);
+}
+
+
+// Calculate QQI from QQIC
+int querier_qqi(uint8_t qqic)
+{
+       return (qqic & 0x80) ? (((qqic & 0xf) | 0x10) << (((qqic >> 4) & 0x7) + 3)) : qqic;
+}
+
+// Calculate MRD from MRC
+int querier_mrd(uint16_t mrc)
+{
+       mrc = ntohs(mrc);
+       return (mrc & 0x8000) ? (((mrc & 0xfff) | 0x1000) << (((mrc >> 12) & 0x7) + 3)) : mrc;
+}
+
+// Calculate QQIC from QQI
+uint8_t querier_qqic(int qqi)
+{
+       if (qqi >= 128) {
+               int exp = 3;
+
+               while ((qqi >> exp) > 0x1f && exp <= 10)
+                       ++exp;
+
+               if (exp > 10)
+                       qqi = 0xff;
+               else
+                       qqi = 0x80 | ((exp - 3) << 4) | ((qqi >> exp) & 0xf);
+       }
+       return qqi;
+}
+
+// Calculate MRC from MRD
+uint16_t querier_mrc(int mrd)
+{
+       if (mrd >= 32768) {
+               int exp = 3;
+
+               while ((mrd >> exp) > 0x1fff && exp <= 10)
+                       ++exp;
+
+               if (exp > 10)
+                       mrd = 0xffff;
+               else
+                       mrd = 0x8000 | ((exp - 3) << 12) | ((mrd >> exp) & 0xfff);
+       }
+       return htons(mrd);
+}
+
+// Attach an interface to a querier-instance
+int querier_attach(struct querier_user_iface *user,
+               struct querier *querier, int ifindex, querier_iface_cb *cb)
+{
+       struct querier_iface *c, *iface = NULL;
+       list_for_each_entry(c, &ifaces, head) {
+               if (c->ifindex == ifindex) {
+                       iface = c;
+                       break;
+               }
+       }
+
+       omgp_time_t now = omgp_time();
+       int res = 0;
+       if (!iface) {
+               if (!(iface = calloc(1, sizeof(*iface)))) {
+                       res = -errno;
+                       goto out;
+               }
+
+               list_add(&iface->head, &ifaces);
+               INIT_LIST_HEAD(&iface->users);
+
+               iface->ifindex = ifindex;
+               iface->timeout.cb = querier_iface_timer;
+               uloop_timeout_set(&iface->timeout, 0);
+
+               groups_init(&iface->groups);
+               iface->groups.source_limit = QUERIER_MAX_SOURCE;
+               iface->groups.group_limit = QUERIER_MAX_GROUPS;
+               iface->groups.cb_update = querier_announce_change;
+               iface->groups.cb_query = querier_send_query;
+               iface->cfg = iface->groups.cfg_v6;
+               iface->igmp_startup_tries = iface->groups.cfg_v4.robustness;
+               iface->mld_startup_tries = iface->groups.cfg_v6.robustness;
+
+               if ((res = mrib_attach_querier(&iface->mrib, ifindex, igmp_handle, mld_handle)))
+                       goto out;
+       }
+
+out:
+       if (iface) {
+               list_add(&user->head, &iface->users);
+               user->iface = iface;
+
+               list_add(&user->user.head, &querier->ifaces);
+               user->user_cb = cb;
+               user->user.querier = querier;
+               user->user.groups = &iface->groups;
+
+               struct group *group;
+               groups_for_each_group(group, &iface->groups)
+                       querier_announce_iface(user, now, group, true);
+       }
+
+       if (res)
+               querier_detach(user);
+       return res;
+}
+
+// Detach an interface from a querier-instance
+void querier_detach(struct querier_user_iface *user)
+{
+       struct querier_iface *iface = user->iface;
+       list_del(&user->user.head);
+       list_del(&user->head);
+
+       omgp_time_t now = omgp_time();
+       struct group *group;
+       groups_for_each_group(group, &iface->groups)
+               querier_announce_iface(user, now, group, false);
+
+       if (list_empty(&iface->users)) {
+               uloop_timeout_cancel(&iface->timeout);
+               groups_deinit(&iface->groups);
+               mrib_detach_querier(&iface->mrib);
+               list_del(&iface->head);
+               free(iface);
+       }
+}
+
+// Initialize querier-instance
+int querier_init(struct querier *querier)
+{
+       memset(querier, 0, sizeof(*querier));
+       INIT_LIST_HEAD(&querier->ifaces);
+       return 0;
+}
+
+// Cleanup querier-instance
+void querier_deinit(struct querier *querier)
+{
+       struct querier_user *user, *n;
+       list_for_each_entry_safe(user, n, &querier->ifaces, head)
+               querier_detach(container_of(user, struct querier_user_iface, user));
+}
diff --git a/src/querier.h b/src/querier.h
new file mode 100644 (file)
index 0000000..ee06989
--- /dev/null
@@ -0,0 +1,129 @@
+/*
+ * Author: Steven Barth <steven at midlink.org>
+ *
+ * Copyright 2015 Deutsche Telekom AG
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+#include <libubox/list.h>
+#include <libubox/uloop.h>
+#include <libubox/avl.h>
+#include <netinet/in.h>
+#include <net/if.h>
+#include <string.h>
+#include <stdbool.h>
+
+#include "mrib.h"
+#include "groups.h"
+
+struct querier_iface {
+       struct list_head head;
+       struct list_head users;
+       struct uloop_timeout timeout;
+       struct groups_config cfg;
+
+       struct uloop_fd igmp_fd;
+       omgp_time_t igmp_next_query;
+       bool igmp_other_querier;
+       int igmp_startup_tries;
+
+       struct uloop_fd mld_fd;
+       omgp_time_t mld_next_query;
+       bool mld_other_querier;
+       int mld_startup_tries;
+
+       struct mrib_querier mrib;
+       struct groups groups;
+       int ifindex;
+};
+
+struct querier;
+struct querier_user;
+struct querier_user_iface;
+
+typedef void (querier_iface_cb)(struct querier_user_iface *user, const struct in6_addr *group,
+               bool include, const struct in6_addr *sources, size_t len);
+
+struct querier_user {
+       struct list_head head;
+       struct groups *groups;
+       struct querier *querier;
+};
+
+struct querier_user_iface {
+       struct list_head head;
+       struct querier_user user;
+       struct querier_iface *iface;
+       querier_iface_cb *user_cb;
+};
+
+
+/* External API */
+int querier_init(struct querier *querier);
+void querier_deinit(struct querier *querier);
+
+int querier_attach(struct querier_user_iface *user, struct querier *querier,
+               int ifindex, querier_iface_cb *cb);
+void querier_detach(struct querier_user_iface *user);
+
+
+/* Internal API */
+
+struct querier {
+       struct list_head ifaces;
+};
+
+#define QUERIER_MAX_SOURCE 75
+#define QUERIER_MAX_GROUPS 256
+#define QUERIER_SUPPRESS (1 << 3)
+
+static inline in_addr_t querier_unmap(const struct in6_addr *addr6)
+{
+       return addr6->s6_addr32[3];
+}
+
+static inline void querier_map(struct in6_addr *addr6, in_addr_t addr4)
+{
+       addr6->s6_addr32[0] = 0;
+       addr6->s6_addr32[1] = 0;
+       addr6->s6_addr32[2] = cpu_to_be32(0xffff);
+       addr6->s6_addr32[3] = addr4;
+}
+
+void querier_announce(struct querier_user *user, omgp_time_t now, const struct group *group, bool enabled);
+void querier_synthesize_events(struct querier *querier);
+
+int querier_qqi(uint8_t qqic);
+int querier_mrd(uint16_t mrc);
+uint8_t querier_qqic(int qi);
+uint16_t querier_mrc(int mrd);
+
+
+void igmp_handle(struct mrib_querier *mrib, const struct igmphdr *igmp, size_t len,
+               const struct sockaddr_in *from);
+int igmp_send_query(struct querier_iface *q,
+               const struct in6_addr *group,
+               const struct list_head *sources,
+               bool suppress);
+
+
+void mld_handle(struct mrib_querier *mrib, const struct mld_hdr *hdr, size_t len,
+               const struct sockaddr_in6 *from);
+ssize_t mld_send_query(struct querier_iface *q,
+               const struct in6_addr *group,
+               const struct list_head *sources,
+               bool suppress);
+