Initial commit
This commit is contained in:
98
parser.py
Normal file
98
parser.py
Normal file
@@ -0,0 +1,98 @@
|
||||
''' Parse OSC packet into dict '''
|
||||
|
||||
# set True to enable log messages
|
||||
DEBUG = True
|
||||
|
||||
import struct
|
||||
import traceback
|
||||
|
||||
def _parse_str(body: bytearray) -> (str, int):
|
||||
''' Parse OSC string '''
|
||||
try:
|
||||
result = ''
|
||||
for b in body:
|
||||
if b == 0:
|
||||
break
|
||||
result += chr(b)
|
||||
else:
|
||||
return None
|
||||
l = ((len(result) // 4) + 1) * 4
|
||||
if l > len(body):
|
||||
return None
|
||||
return (result, l)
|
||||
except:
|
||||
if DEBUG: traceback.print_exc()
|
||||
return None
|
||||
|
||||
def _parse_int(body: bytearray) -> (int, int):
|
||||
''' Parse OSC int '''
|
||||
try:
|
||||
return (struct.unpack('>i', body[0:4])[0], 4)
|
||||
except:
|
||||
if DEBUG: traceback.print_exc()
|
||||
return None
|
||||
|
||||
def _parse_float(body: bytearray) -> (float, int):
|
||||
''' Parse OSC float '''
|
||||
try:
|
||||
return (struct.unpack('>f', body[0:4])[0], 4)
|
||||
except:
|
||||
if DEBUG: traceback.print_exc()
|
||||
return None
|
||||
|
||||
def parse(body: bytearray) -> dict | None:
|
||||
''' Parse OSC datagram into dict. '''
|
||||
# too short
|
||||
if len(body) < 8:
|
||||
if DEBUG: print('[OSC PARSER] Body is shorter than 8 bytes')
|
||||
return None
|
||||
# does not start with slash
|
||||
if body[0] != ord('/'):
|
||||
if DEBUG: print('[OSC PARSER] First byte of packet is not forward slash')
|
||||
return None
|
||||
# get the address
|
||||
addr = _parse_str(body)
|
||||
if not addr:
|
||||
if DEBUG: print('[OSC PARSER] Failed to parse address (OSC-string)')
|
||||
return None
|
||||
# edit body
|
||||
body = body[addr[1]:]
|
||||
# get the type tag
|
||||
type_tag = _parse_str(body)
|
||||
if not type_tag:
|
||||
if DEBUG: print('[OSC PARSER] Failed to parse type tag (OSC-string)')
|
||||
return None
|
||||
# edit body
|
||||
body = body[type_tag[1]:]
|
||||
# type is invalid
|
||||
if type_tag[0][0] != ',':
|
||||
if DEBUG: print('[OSC PARSER] The first symbol of type tag must be comma')
|
||||
return None
|
||||
# arguments
|
||||
args = []
|
||||
# type parsers
|
||||
type_parsers = {
|
||||
'i': _parse_int,
|
||||
'f': _parse_float,
|
||||
's': _parse_str
|
||||
}
|
||||
# parse the arguments
|
||||
for arg_type in type_tag[0][1:]:
|
||||
# parse body if parser exists
|
||||
if arg_type not in type_parsers:
|
||||
if DEBUG: print('[OSC PARSER] Argument of type \'%s\' can\'t be parsed by this library' % arg_type)
|
||||
return None
|
||||
parse_result = type_parsers[arg_type](body)
|
||||
# failed to parse
|
||||
if parse_result is None:
|
||||
if DEBUG: print('[OSC PARSER] Failed to parse an argument of type \'%s\'' % arg_type)
|
||||
return None
|
||||
# success
|
||||
args.append(parse_result[0])
|
||||
body = body[parse_result[1]:]
|
||||
# result
|
||||
return {
|
||||
'addr': addr[0],
|
||||
'type_tag': type_tag[0],
|
||||
'args': args
|
||||
}
|
||||
Reference in New Issue
Block a user