csv_parser.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. from data import LocationData
  2. from collections import namedtuple
  3. ColumnSpec = namedtuple("ColumnSpec", "name format")
  4. field_fmt = {
  5. 'lat' : ColumnSpec('latitude', 'f'),
  6. 'lon' : ColumnSpec('longitude', 'f'),
  7. 'eig' : ColumnSpec('eigenvalue', 'f'),
  8. 'prec' : ColumnSpec("precision", 'f'),
  9. 'hwid' : ColumnSpec('bind_number', 's'),
  10. 'yaw' : ColumnSpec('pitch', 'f'),
  11. 'press' : ColumnSpec('airPressure', 'f'),
  12. 'timestamp' : ColumnSpec('trigtime', 'd'),
  13. }
  14. def do_convert(s, fmt):
  15. if fmt == 's':
  16. return s
  17. if fmt == 'f':
  18. return float(s)
  19. if fmt == 'd':
  20. return int(s)
  21. if fmt == 'n':
  22. return "null"
  23. throw("what is this format?")
  24. def data_from_row(d):
  25. d1 = {}
  26. for field in field_fmt:
  27. name = field_fmt[field].name
  28. fmt = field_fmt[field].format
  29. if name in d:
  30. st = d[name]
  31. elif ('\ufeff' + name) in d:
  32. # workaround stupid Windoge BOM
  33. st = d['\ufeff' + name]
  34. else:
  35. st = ''
  36. if st == '' and fmt != 'n':
  37. raise KeyError
  38. d1[field] = do_convert(st, fmt)
  39. if d1['lat'] == 0:
  40. d1['lat'] = None
  41. if d1['lon'] == 0:
  42. d1['lon'] = None
  43. return LocationData(**d1)
  44. def parse_data_from_csv(f):
  45. if type(f) == str:
  46. with open(f, encoding='utf-8') as fd:
  47. return parse_data_from_csv(fd)
  48. from csv import DictReader
  49. rd = DictReader(f)
  50. ret = []
  51. for row in rd:
  52. try:
  53. entry = data_from_row(row)
  54. if entry.yaw < 0:
  55. continue
  56. ret.append(entry)
  57. except KeyError:
  58. print("suspicous line:", row)
  59. return ret
  60. if __name__ == '__main__':
  61. print(parse_data_from_csv("test.csv"))