83 lines
2.3 KiB
Python
83 lines
2.3 KiB
Python
import asyncio
|
|
import traceback
|
|
import pytyuosc.parser as parser
|
|
import json
|
|
|
|
class __OSCServer(asyncio.DatagramProtocol):
|
|
''' OSC server '''
|
|
def __init__(self, callback, event_loop):
|
|
super().__init__()
|
|
self.callback = callback
|
|
self.loop = event_loop
|
|
self.is_cb_async = asyncio.iscoroutinefunction(self.callback)
|
|
|
|
def datagram_received(self, data, addr):
|
|
try:
|
|
# try to parse
|
|
osc_data = parser.parse(data)
|
|
# failed to parse
|
|
if osc_data is None:
|
|
return
|
|
# no callback - debug
|
|
if self.callback is None:
|
|
print(osc_data)
|
|
return
|
|
# callback
|
|
if self.is_cb_async:
|
|
if self.loop is not None:
|
|
self.loop.create_task(self.callback(osc_data, addr))
|
|
else:
|
|
self.callback(osc_data, addr)
|
|
except:
|
|
traceback.print_exc()
|
|
pass
|
|
|
|
__transport, __protocol = None, None
|
|
|
|
async def start(host: str, port: int, callback) -> bool:
|
|
global __transport, __protocol
|
|
try:
|
|
# exists
|
|
if __transport is not None:
|
|
return False
|
|
# start
|
|
loop = asyncio.get_running_loop()
|
|
__transport, __protocol = await loop.create_datagram_endpoint(
|
|
lambda: __OSCServer(callback, loop),
|
|
local_addr=(host, port)
|
|
)
|
|
# success
|
|
return True
|
|
except:
|
|
traceback.print_exc()
|
|
# serious failure
|
|
return False
|
|
|
|
async def stop() -> None:
|
|
global __transport, __protocol
|
|
# does not exist
|
|
if __transport is None:
|
|
return
|
|
# close
|
|
__transport.close()
|
|
__transport, __protocol = None, None
|
|
|
|
# test
|
|
async def __test_cb(data, addr):
|
|
print('Received packet from %s:%s:\n%s' % (addr[0], addr[1], json.dumps(data, indent=4)))
|
|
|
|
async def __main__() -> None:
|
|
print('Starting OSC server on 0.0.0.0:23654')
|
|
if not await start('0.0.0.0', '23654', __test_cb):
|
|
print('Failed')
|
|
return
|
|
print('Started')
|
|
print('The server will print packet content on reception.')
|
|
print('GOING TO WORK FOREVER')
|
|
await asyncio.Future()
|
|
await stop()
|
|
print('Stopped')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
asyncio.run(__main__()) |