''' Module that implements OSC ''' import asyncio import struct _TASK = None _Q = asyncio.Queue() def __string_to_osc(s: str) -> bytes: ''' Convert python string to OSC-string ''' data = s.encode('ascii') to_append = 4 - (len(data) % 4) data += b'\0' * to_append return data async def __serve() -> None: ''' Task that manages OSC sending ''' # log print('[I] OSC: task is running') # task loop while True: # get value from queue val = await _Q.get() # stop if val == 0: break # host, port, OSC address and data host = val['host'] port = val['port'] addr = val['addr'] data = val['data'] # OSC type tag string type_tag = ',' # OSC data to append to the final packet osc_data = bytes() # check if data is correct for d in data: d_type = type(d) # process the value if d_type is int: type_tag += 'i' osc_data += struct.pack('>i', d) elif d_type is str: type_tag += 's' osc_data += __string_to_osc(d) # unsupported else: print('[!] OSC: unsupported data type was provided! The packet is discarded.') print(' * host: %s' % host) print(' * port: %s' % port) print(' * address: %s' % addr) print(' * data: %s' % data) print(' (bad type is %s)' % d_type) type_tag = None break # no type tag if type_tag is None: continue # convert type tag to OSC-string type_tag = __string_to_osc(type_tag) # create OSC packet packet = __string_to_osc(addr) + type_tag + osc_data # send the packet trans, prot = await asyncio.get_running_loop().create_datagram_endpoint( asyncio.DatagramProtocol, remote_addr=(host, port) ) try: trans.sendto(packet) print('[I] OSC: sent data to %s:%s' % (host, port)) except: print('[!] OSC: failed to send to %s:%s!' % (host, port)) print(' * address: %s' % addr) print(' * data: %s' % data) finally: trans.close() # log print('[I] OSC: task is stopped') async def start() -> None: ''' Starts OSC task ''' global _TASK # already started if _TASK is not None: return _TASK = asyncio.create_task(__serve()) async def stop() -> None: ''' Stops OSC task ''' global _TASK # not started if _TASK is None: return # stop await _Q.put(0) await _TASK # reset _TASK = None async def send(host: str, port: str, addr: str, values: list) -> None: ''' Schedule packet transmission ''' # not started if _TASK is None: return # create d = { 'host': host, 'port': port, 'addr': addr, 'data': values } await _Q.put(d)