''' Module that implements OSC ''' import asyncio import struct from pytyuosc import packet #import packet _DEBUG = True _TASK = None _Q = asyncio.Queue() def __string_to_osc(s: str) -> bytes: ''' Convert python string to OSC-string ''' data = s.encode('ascii') to_append = 4 - (len(data) % 4) data += b'\0' * to_append return data async def __serve() -> None: ''' Task that manages OSC sending ''' # log if _DEBUG: print('[I] OSC Client: task is running') # task loop while True: # get value from queue val = await _Q.get() # stop if val == 0: break # host, port, OSC address and data host = val['host'] port = val['port'] addr = val['addr'] data = val['data'] try: data_to_send = packet.serialize(addr, data, _DEBUG) if data_to_send is None: raise Exception() except: continue # send the packet trans, prot = await asyncio.get_running_loop().create_datagram_endpoint( asyncio.DatagramProtocol, remote_addr=(host, port), allow_broadcast=True ) try: trans.sendto(data_to_send) if _DEBUG: print('[I] OSC Client: sent data to %s:%s' % (host, port)) except: if _DEBUG: print('[!] OSC Client: failed to send to %s:%s!' % (host, port)) print(' * address: %s' % addr) print(' * data: %s' % data) finally: trans.close() # log if _DEBUG: print('[I] OSC Client: task is stopped') async def start() -> None: ''' Starts OSC task ''' global _TASK # already started if _TASK is not None: return _TASK = asyncio.create_task(__serve()) async def stop() -> None: ''' Stops OSC task ''' global _TASK # not started if _TASK is None: return # stop await _Q.put(0) await _TASK # reset _TASK = None async def send(host: str, port: str, addr: str, values: list) -> None: ''' Schedule packet transmission ''' # not started if _TASK is None: return # create d = { 'host': host, 'port': port, 'addr': addr, 'data': values } await _Q.put(d) # # TEST # async def __test__() -> None: print('Going to send data to 255.255.255.255:8000') await start() await send('255.255.255.255', 8000, '/pytyuosc/osc/testing/is/in/progress', ['a', 'b', 1, 2, 3.4, 5.6]) await stop() print('Data must have been sent') if __name__ == '__main__': asyncio.run(__test__())