From 8b9f5ec5cf44b9b586849b675a05f48a55d76ff9 Mon Sep 17 00:00:00 2001 From: Marek Sebera Date: Sat, 12 Mar 2022 01:09:01 +0100 Subject: [PATCH] added pi-header placeholder for consuming and producing crc-ccit and bptc196,96 protected payload per-etsi-specs, add hytera MFID, add VBPTC32,11 with tests to extract and (de)interleave single-burst-variable-bptc data --- okdmr/dmrlib/etsi/fec/vbptc_32_11.py | 232 ++++++++++++++++++ okdmr/dmrlib/etsi/layer2/burst.py | 3 + .../etsi/layer2/elements/feature_set_ids.py | 1 + okdmr/dmrlib/etsi/layer2/pdu/pi_header.py | 34 +++ okdmr/dmrlib/tools/pcap_tool.py | 16 +- okdmr/tests/dmrlib/etsi/crc/test_crc16.py | 2 + .../tests/dmrlib/etsi/fec/test_vbptc_32_11.py | 37 +++ okdmr/tests/dmrlib/etsi/layer2/test_burst.py | 13 +- 8 files changed, 334 insertions(+), 4 deletions(-) create mode 100644 okdmr/dmrlib/etsi/fec/vbptc_32_11.py create mode 100644 okdmr/dmrlib/etsi/layer2/pdu/pi_header.py create mode 100644 okdmr/tests/dmrlib/etsi/fec/test_vbptc_32_11.py diff --git a/okdmr/dmrlib/etsi/fec/vbptc_32_11.py b/okdmr/dmrlib/etsi/fec/vbptc_32_11.py new file mode 100644 index 0000000..65aa4e1 --- /dev/null +++ b/okdmr/dmrlib/etsi/fec/vbptc_32_11.py @@ -0,0 +1,232 @@ +from typing import Dict, Tuple + +import numpy +from bitarray import bitarray +from okdmr.dmrlib.etsi.fec.hamming_16_11_4 import Hamming16114 + + +class VBPTC3211: + """ + ETSI TS 102 361-1 V2.5.1 (2017-10) - B.2.2 Single Burst Variable length BPTC + """ + + # disable formatter for this whole table, as manual formatting is applied + # fmt: off + # @formatter:off + INTERLEAVING_INDICES: Dict[int, Tuple[int, int, int, bool, bool]] = { + # (key) index => (value) interleave index, row, column, is row hamming, is parity check bit + # rows are numbered from 1 to match the documentation/specification + + # Row 1 of table, starts with SB(10)/RC(10) + 0: (0, 1, 0, False, False), + 1: (2, 1, 1, False, False), + 2: (4, 1, 2, False, False), + 3: (6, 1, 3, False, False), + 4: (8, 1, 4, False, False), + 5: (10, 1, 5, False, False), + 6: (12, 1, 6, False, False), + 7: (14, 1, 7, False, False), + 8: (16, 1, 8, False, False), + 9: (18, 1, 9, False, False), + 10: (20, 1, 10, False, False), + # Row 1 hamming bits, starts with H1(4) + 11: (22, 1, 11, True, False), + 12: (24, 1, 12, True, False), + 13: (26, 1, 13, True, False), + 14: (28, 1, 14, True, False), + 15: (30, 1, 15, True, False), + + # Row 4 of table, starts with PC(15) + 16: (17, 2, 0, False, True), + 17: (19, 2, 1, False, True), + 18: (21, 2, 2, False, True), + 19: (23, 2, 3, False, True), + 20: (25, 2, 4, False, True), + 21: (27, 2, 5, False, True), + 22: (29, 2, 6, False, True), + 23: (31, 2, 7, False, True), + 24: (1, 2, 8, False, True), + 25: (3, 2, 9, False, True), + 26: (5, 2, 10, False, True), + 27: (7, 2, 11, False, True), + 28: (9, 2, 12, False, True), + 29: (11, 2, 13, False, True), + 30: (13, 2, 14, False, True), + 31: (15, 2, 15, False, True), + } + """Interleave table as key(index) => value(interleave index, row, column, is reserved, is hamming)""" + # @formatter:on + # fmt: on + + FULL_INTERLEAVING_MAP: Dict[int, int] = dict( + (k, v[0]) for k, v in INTERLEAVING_INDICES.items() + ) + """Extract only (table index -> interleave index)""" + FULL_DEINTERLEAVING_MAP: Dict[int, int] = dict( + (v[0], k) for k, v in INTERLEAVING_INDICES.items() + ) + """Extract only (interleave index -> index)""" + DEINTERLEAVE_INFO_BITS_ONLY_MAP: Dict[int, int] = dict( + (i, l) + for i, l in enumerate( + dict( + (idx, interleave_idx) + for idx, ( + interleave_idx, + row, + col, + is_hamming, + is_parity, + ) in INTERLEAVING_INDICES.items() + if not is_hamming and not is_parity # not parity bits or hamming + ).values() + ) + ) + """Extract only (interleave index -> index) where it's not reserved or hamming bit""" + INTERLEAVE_INFO_BITS_ONLY_MAP: Dict[int, int] = dict( + (i, l) + for i, l in enumerate( + dict( + (interleave_idx, idx) + for idx, ( + interleave_idx, + row, + col, + is_hamming, + is_parity, + ) in INTERLEAVING_INDICES.items() + if not is_hamming and not is_parity + ).values() + ) + ) + """Extract only (index -> interleave index) where it's not reserved or hamming bit""" + + @staticmethod + def deinterleave_all_bits(bits: bitarray) -> bitarray: + """ + Will take BPTC interleaved (and FEC protected) bits and return 11 bits of deinterleaved bits + :param bits: 32 bits of on-air payload + :return: + """ + assert ( + len(bits) == 32 + ), f"VBPTC 31,11 deinterleave_all_bits requires 32 bits, got {len(bits)}" + mapping = VBPTC3211.FULL_DEINTERLEAVING_MAP + + out = bitarray([0] * len(mapping), endian="big") + for i, n in mapping.items(): + out[i] = bits[n] + + return out + + @staticmethod + def deinterleave_data_bits(bits: bitarray) -> bitarray: + """ + Will take BPTC interleaved (and FEC protected) bits and return 11bits of data + :param bits: 32 bits of on-air payload + :return: bitarray with 11 (data bits) + """ + assert len(bits) == 32, f"VBPTC 32,11 decode requires 32 bits, got {len(bits)}" + mapping = VBPTC3211.DEINTERLEAVE_INFO_BITS_ONLY_MAP + + out = bitarray([0] * len(mapping.keys()), endian="big") + for i, n in mapping.items(): + out[i] = bits[n] + + return out + + @staticmethod + def make_encoding_table() -> numpy.ndarray: + # create table 4 rows, 17 columns, for FEC encoding + table: numpy.ndarray = numpy.ndarray(shape=(2, 16), dtype=int) + table.fill(0) + + return table + + @staticmethod + def fill_encoding_table( + table: numpy.ndarray, bits_deinterleaved: bitarray + ) -> numpy.ndarray: + assert ( + len(bits_deinterleaved) == 11 or len(bits_deinterleaved) == 32 + ), f"Can fill encoding table only with data bits (len 11) or full bits (len 32), got {len(bits_deinterleaved)}" + + # make bitarray of size 32, fill with provided bits + mapping = ( + VBPTC3211.DEINTERLEAVE_INFO_BITS_ONLY_MAP + if len(bits_deinterleaved) == 11 + else VBPTC3211.FULL_DEINTERLEAVING_MAP + ) + bits_interleaved: bitarray = bitarray([0] * 32, endian="big") + + for index, interleave_index in mapping.items(): + bits_interleaved[interleave_index] = bits_deinterleaved[index] + + for data_index, ( + interleave_idx, + row_no, + col_no, + is_hamming, + is_parity, + ) in VBPTC3211.INTERLEAVING_INDICES.items(): + table[row_no - 1][col_no] = bits_interleaved[interleave_idx] + + return table + + @staticmethod + def encode(bits_deinterleaved: bitarray) -> bitarray: + """ + Takes 11 bits of data (info bits) and return interleaved and FEC protected 32 bits + :param bits_deinterleaved: + :return: + """ + if len(bits_deinterleaved) == 32: + # full deinterleaved data including hamming and parity + # interleave again and deinterleave only data bits + interleaved: bitarray = bitarray([0] * 32) + for data_index, ( + interleave_index, + _, + _, + _, + _, + ) in VBPTC3211.INTERLEAVING_INDICES.items(): + interleaved[data_index] = bits_deinterleaved[interleave_index] + bits_deinterleaved = VBPTC3211.deinterleave_data_bits(interleaved) + + assert ( + len(bits_deinterleaved) == 11 + ), f"Unexpected number of bits fed to VBPTC3211.encode, expected 11 or 32, got {len(bits_deinterleaved)}" + + table: numpy.ndarray = VBPTC3211.make_encoding_table() + table = VBPTC3211.fill_encoding_table( + table=table, bits_deinterleaved=bits_deinterleaved + ) + + # fill row 0 with hamming + for row in range(0, 1): + table[row] = Hamming16114.generate(table[row][:11]) + + # fill columns with parity bit + for column in range(0, 16): + table[:, column] = VBPTC3211.set_parity(table[:, column]) + + out: bitarray = bitarray([0] * 32) + for index, ( + interleave_index, + row, + col, + is_hamming, + is_parity, + ) in VBPTC3211.INTERLEAVING_INDICES.items(): + out[interleave_index] = table[row - 1][col] + + return out + + @staticmethod + def set_parity(column: numpy.ndarray) -> numpy.ndarray: + assert len(column) in (1, 2) + if len(column) == 1: + column = numpy.append(column, [0]) + column[1] = column[0] + return column diff --git a/okdmr/dmrlib/etsi/layer2/burst.py b/okdmr/dmrlib/etsi/layer2/burst.py index 079575e..e04d5ad 100644 --- a/okdmr/dmrlib/etsi/layer2/burst.py +++ b/okdmr/dmrlib/etsi/layer2/burst.py @@ -13,6 +13,7 @@ from okdmr.dmrlib.etsi.layer2.pdu.data_header import DataHeader from okdmr.dmrlib.etsi.layer2.pdu.embedded_signalling import EmbeddedSignalling from okdmr.dmrlib.etsi.layer2.pdu.full_link_control import FullLinkControl +from okdmr.dmrlib.etsi.layer2.pdu.pi_header import PIHeader from okdmr.dmrlib.etsi.layer2.pdu.rate12_data import Rate12Data from okdmr.dmrlib.etsi.layer2.pdu.rate34_data import Rate34Data from okdmr.dmrlib.etsi.layer2.pdu.slot_type import SlotType @@ -97,6 +98,8 @@ def extract_data(self) -> Optional[BitsInterface]: return CSBK.from_bits(self.info_bits_deinterleaved) elif self.data_type == DataTypes.VoiceLCHeader: return FullLinkControl.from_bits(self.info_bits_deinterleaved) + elif self.data_type == DataTypes.PIHeader: + return PIHeader.from_bits(self.info_bits_deinterleaved) elif self.data_type == DataTypes.TerminatorWithLC: return FullLinkControl.from_bits(self.info_bits_deinterleaved) elif self.data_type == DataTypes.DataHeader: diff --git a/okdmr/dmrlib/etsi/layer2/elements/feature_set_ids.py b/okdmr/dmrlib/etsi/layer2/elements/feature_set_ids.py index c382cd4..11c463b 100644 --- a/okdmr/dmrlib/etsi/layer2/elements/feature_set_ids.py +++ b/okdmr/dmrlib/etsi/layer2/elements/feature_set_ids.py @@ -16,6 +16,7 @@ class FeatureSetIDs(BitsInterface, enum.Enum): StandardizedFID = 0b00000000 ReservedForFutureStandardization = 0b00000001 ManufacturerFID = 0b00000100 + HyteraFID = 0b00010000 ReservedForFutureMFID = 0b10000000 @classmethod diff --git a/okdmr/dmrlib/etsi/layer2/pdu/pi_header.py b/okdmr/dmrlib/etsi/layer2/pdu/pi_header.py new file mode 100644 index 0000000..a33c4e1 --- /dev/null +++ b/okdmr/dmrlib/etsi/layer2/pdu/pi_header.py @@ -0,0 +1,34 @@ +from typing import Union + +from bitarray import bitarray +from bitarray.util import ba2int, int2ba +from okdmr.dmrlib.etsi.crc.crc16 import CRC16 +from okdmr.dmrlib.etsi.layer2.elements.crc_masks import CrcMasks + +from okdmr.dmrlib.utils.bits_bytes import bits_to_bytes, bytes_to_bits +from okdmr.dmrlib.utils.bits_interface import BitsInterface + + +class PIHeader(BitsInterface): + def __init__(self, data: bytes, crc: Union[int, bytes] = 0): + self.data: bytes = data + self.crc: int = self.calculate_crc() + self.crc_ok: bool = self.crc == ( + crc if isinstance(crc, int) else int.from_bytes(crc, byteorder="big") + ) + + def calculate_crc(self) -> int: + return CRC16.calculate(data=self.data, mask=CrcMasks.PiHeader) + + def __repr__(self): + return ( + f"[PI Header] [Data({len(self.data)}) {self.data.hex()} {bytes_to_bits(self.data)}]" + + ("" if self.crc_ok else " [CRC16-CCIT INVALID]") + ) + + @staticmethod + def from_bits(bits: bitarray) -> "PIHeader": + return PIHeader(data=bits_to_bytes(bits[:-16]), crc=ba2int(bits[-16:])) + + def as_bits(self) -> bitarray: + return bytes_to_bits(self.data) + int2ba(self.crc, length=16) diff --git a/okdmr/dmrlib/tools/pcap_tool.py b/okdmr/dmrlib/tools/pcap_tool.py index 6cc8bef..9ec4d95 100644 --- a/okdmr/dmrlib/tools/pcap_tool.py +++ b/okdmr/dmrlib/tools/pcap_tool.py @@ -6,9 +6,11 @@ from typing import Callable, List, Dict, Optional, Tuple from bitarray import bitarray +from kaitaistruct import KaitaiStruct from okdmr.kaitai.homebrew.mmdvm2020 import Mmdvm2020 from okdmr.kaitai.hytera.ip_site_connect_heartbeat import IpSiteConnectHeartbeat from okdmr.kaitai.hytera.ip_site_connect_protocol import IpSiteConnectProtocol +from okdmr.tests.dmrlib.tests_utils import prettyprint from scapy.data import UDP_SERVICES from scapy.layers.inet import UDP, IP from scapy.layers.l2 import Ether @@ -20,7 +22,6 @@ from okdmr.dmrlib.etsi.layer2.elements.preemption_power_indicator import ( PreemptionPowerIndicator, ) -from okdmr.dmrlib.etsi.layer2.elements.sync_patterns import SyncPatterns from okdmr.dmrlib.etsi.layer2.pdu.full_link_control import FullLinkControl from okdmr.dmrlib.utils.bits_bytes import bytes_to_bits, byteswap_bytes from okdmr.dmrlib.utils.parsing import try_parse_packet @@ -38,6 +39,15 @@ def process_packet(self, data: bytes, packet: IP) -> Optional[FullLinkControl]: burst: Optional[Burst] = PcapTool.debug_packet( data=data, packet=packet, hide_unknown=True, silent=True ) + if ( + burst + and burst.has_emb + and burst.emb.link_control_start_stop == LCSS.SingleFragmentLCorCSBK + ): + print( + f"Single burst data for VBPTC 32,11 [{burst.emb.preemption_and_power_control_indicator}] {burst.embedded_signalling_bits}" + ) + if ( not burst or not burst.has_emb @@ -127,7 +137,6 @@ def debug_packet( dmr_bytes = byteswap_bytes(pkt.ipsc_payload)[:-1] if burst.as_bits() != bytes_to_bits(dmr_bytes): print(f"as_bits no match {dmr_bytes.hex()}") - exit() elif isinstance(pkt, Mmdvm2020): if isinstance(pkt.command_data, Mmdvm2020.TypeDmrData): burst: Burst = Burst.from_mmdvm(pkt.command_data) @@ -138,7 +147,6 @@ def debug_packet( ) if burst.as_bits() != bytes_to_bits(pkt.command_data.dmr_data): print(f"as_bits no match {pkt.command_data.dmr_data.hex()}") - exit() elif isinstance(pkt, IpSiteConnectHeartbeat): pass elif not hide_unknown and not silent: @@ -151,6 +159,8 @@ def debug_packet( else f" type {str(type(pkt)).rsplit('.')[-1]}" ) ) + if isinstance(pkt, KaitaiStruct): + prettyprint(pkt) return burst diff --git a/okdmr/tests/dmrlib/etsi/crc/test_crc16.py b/okdmr/tests/dmrlib/etsi/crc/test_crc16.py index 13565d1..cfe246a 100644 --- a/okdmr/tests/dmrlib/etsi/crc/test_crc16.py +++ b/okdmr/tests/dmrlib/etsi/crc/test_crc16.py @@ -13,6 +13,8 @@ def test_crc16(): ("4da323383b23383b0560", "8040", CrcMasks.DataHeader), # csbk ("bd0080180008fd23383b", "b2ed", CrcMasks.CSBK), + # hytera pi header contents + ("211002177afc73000009", "0dda", CrcMasks.PiHeader) ] # fmt:on # @formatter:on diff --git a/okdmr/tests/dmrlib/etsi/fec/test_vbptc_32_11.py b/okdmr/tests/dmrlib/etsi/fec/test_vbptc_32_11.py new file mode 100644 index 0000000..42153c8 --- /dev/null +++ b/okdmr/tests/dmrlib/etsi/fec/test_vbptc_32_11.py @@ -0,0 +1,37 @@ +from typing import List + +from bitarray import bitarray +from okdmr.dmrlib.etsi.fec.vbptc_32_11 import VBPTC3211 + + +def test_vbptc_sanity(): + assert len(VBPTC3211.FULL_DEINTERLEAVING_MAP) == 32 + assert len(VBPTC3211.DEINTERLEAVE_INFO_BITS_ONLY_MAP) == 11 + assert len(VBPTC3211.INTERLEAVING_INDICES) == 32 + assert len(VBPTC3211.INTERLEAVE_INFO_BITS_ONLY_MAP) == 11 + assert len(VBPTC3211.FULL_INTERLEAVING_MAP) == 32 + + +def test_encode_decode_vbptc(): + bursts: List[(str,)] = [ + ("00000100010110000000100010100100",), + ] + + for (burst,) in bursts: + on_air_bits = bitarray(burst) + + deinterleaved_all_bits = VBPTC3211.deinterleave_all_bits(on_air_bits) + assert len(deinterleaved_all_bits) == 32 + + deinterleaved_info_bits = VBPTC3211.deinterleave_data_bits(on_air_bits) + assert len(deinterleaved_info_bits) == 11 + + deinterleaved_data_bits = VBPTC3211.deinterleave_data_bits(on_air_bits) + + encoded_all_bits = VBPTC3211.encode(deinterleaved_all_bits) + encoded_data_bits = VBPTC3211.encode(deinterleaved_data_bits) + encoded_info_bits = VBPTC3211.encode(deinterleaved_info_bits) + + assert encoded_all_bits == encoded_data_bits + assert encoded_data_bits == encoded_info_bits + assert encoded_info_bits == on_air_bits diff --git a/okdmr/tests/dmrlib/etsi/layer2/test_burst.py b/okdmr/tests/dmrlib/etsi/layer2/test_burst.py index 32027fa..ed5923c 100644 --- a/okdmr/tests/dmrlib/etsi/layer2/test_burst.py +++ b/okdmr/tests/dmrlib/etsi/layer2/test_burst.py @@ -114,6 +114,15 @@ def test_burst_as_bits(): "00e527076f2a4b0f03ed010115ddff57d75df5d6f145422817d6234b6e08802018", BurstTypes.DataAndControl, ), + ( + "015149880ba01b3816406c80c46d5d7f77fd757e32990118206005a02341391033", + BurstTypes.DataAndControl, + ), + # this one does not match for valid reason, crc is nulled out, don't know why hytera sends voice lc header without set crc + # ( + # "015149880ba01b3816406c80c46d5d7f77fd757e32990118206005a02341391000", + # BurstTypes.DataAndControl + # ) ] for (hexstr, burst_type) in bursts: _bytes: bytes = bytes.fromhex(hexstr) @@ -121,7 +130,9 @@ def test_burst_as_bits(): assert ( b.data.as_bits() == b.info_bits_deinterleaved ), f"Mismatch in {repr(b)} {b.as_bits().tobytes().hex()}" - assert b.as_bits().tobytes() == _bytes, f"Mismatch in {repr(b)}" + assert ( + b.as_bits().tobytes() == _bytes + ), f"Mismatch in {repr(b)} {b.as_bits().tobytes().hex()} {_bytes.hex()}" if __name__ == "__main__":