Files
lib-py-osc/server.py

83 lines
2.3 KiB
Python

import asyncio
import traceback
import pytyuosc.parser as parser
import json
class __OSCServer(asyncio.DatagramProtocol):
''' OSC server '''
def __init__(self, callback, event_loop):
super().__init__()
self.callback = callback
self.loop = event_loop
self.is_cb_async = asyncio.iscoroutinefunction(self.callback)
def datagram_received(self, data, addr):
try:
# try to parse
osc_data = parser.parse(data)
# failed to parse
if osc_data is None:
return
# no callback - debug
if self.callback is None:
print(osc_data)
return
# callback
if self.is_cb_async:
if self.loop is not None:
self.loop.create_task(self.callback(osc_data, addr))
else:
self.callback(osc_data, addr)
except:
traceback.print_exc()
pass
__transport, __protocol = None, None
async def start(host: str, port: int, callback) -> bool:
global __transport, __protocol
try:
# exists
if __transport is not None:
return False
# start
loop = asyncio.get_running_loop()
__transport, __protocol = await loop.create_datagram_endpoint(
lambda: __OSCServer(callback, loop),
local_addr=(host, port)
)
# success
return True
except:
traceback.print_exc()
# serious failure
return False
async def stop() -> None:
global __transport, __protocol
# does not exist
if __transport is None:
return
# close
__transport.close()
__transport, __protocol = None, None
# test
async def __test_cb(data, addr):
print('Received packet from %s:%s:\n%s' % (addr[0], addr[1], json.dumps(data, indent=4)))
async def __main__() -> None:
print('Starting OSC server on 0.0.0.0:23654')
if not await start('0.0.0.0', '23654', __test_cb):
print('Failed')
return
print('Started')
print('The server will print packet content on reception.')
print('GOING TO WORK FOREVER')
await asyncio.Future()
await stop()
print('Stopped')
if __name__ == '__main__':
asyncio.run(__main__())