echo_server.py 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. #!/usr/bin/env python
  2. from erlpack import pack, unpack
  3. import socket
  4. import struct
  5. from string import Template
  6. PORT = 5001
  7. HOST = ''
  8. BACKLOG = 5
  9. file_tmpl = Template("""
  10. from erlpack import Atom, pack, unpack
  11. ${testcases}
  12. """.strip())
  13. testcase_pack_tmpl = Template("""
  14. def test_pack_${name}():
  15. assert pack(${python_rep}) == ${packed_rep}
  16. """)
  17. testcase_unpack_tmpl = Template("""
  18. def test_unpack_${name}():
  19. assert unpack(${packed_rep}) == ${python_rep}
  20. """)
  21. def start_server():
  22. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  23. s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  24. s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  25. s.bind((HOST, PORT))
  26. s.listen(BACKLOG)
  27. client, address = s.accept()
  28. clientfile = client.makefile()
  29. testcases = ''
  30. is_name = True
  31. name = ''
  32. while 1:
  33. size_str = clientfile.read(4)
  34. if not size_str:
  35. break
  36. else:
  37. (size,) = struct.unpack("!L", size_str)
  38. data_str = clientfile.read(size)
  39. term = unpack(data_str)
  40. encoded_term = pack(term)
  41. out_size = len(encoded_term)
  42. clientfile.write(struct.pack("!L", long(out_size)))
  43. clientfile.write(encoded_term)
  44. clientfile.flush()
  45. if is_name:
  46. name = term
  47. else:
  48. testcases += testcase_pack_tmpl.substitute(name=name,
  49. python_rep=repr(term),
  50. packed_rep=repr(encoded_term))
  51. testcases += testcase_unpack_tmpl.substitute(name=name,
  52. python_rep=repr(term),
  53. packed_rep=repr(data_str))
  54. is_name = not is_name
  55. with open('tests/test_generated.py', 'w') as f:
  56. f.write(file_tmpl.substitute(testcases=testcases))
  57. clientfile.close()
  58. client.close()
  59. start_server()