123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- from sklearn.cluster import DBSCAN
- from sklearn import metrics
- from collections import namedtuple
- import numpy as np
- import matplotlib.pyplot as plt
- from itertools import groupby
- from math import pi, sin, cos
- import csv_parser
- import recog
- import metric
- def cluster_statistic(xy):
- center = np.array([np.average(xy[:, 0]), np.average(xy[:, 1])])
- center_p = namedtuple("point", "lon lat")(center[0], center[1])
- rad = 0
- for loc in xy:
- p = namedtuple("point", "lon lat")(loc[0], loc[1])
- rad = max(rad, metric.spherical_distance(center_p, p))
- rad *= 6400 * 1000
- return (center, rad)
- if __name__ == '__main__':
- data = csv_parser.parse_data_from_csv('pitchtest0730.csv')
- groups = groupby(data, key = lambda x: x.hwid)
- entries = []
- for k, grp in groups:
- data1 = list(grp)
- data1.sort(key = lambda x: x.timestamp)
- entries1 = recog.recognize_entries(data)
- for i in entries1:
- entries.append(i)
- for ent in entries:
- print(ent)
- x = []
- yaws = []
- for e in entries:
- x.append([e.lon, e.lat])
- yaws.append(e.yaw / 180 * pi)
- x = np.array(x)
- yaws = np.array(yaws)
- db = DBSCAN(eps = 5/6400000, min_samples = 20,
- metric = metric.spherical_distance).fit(x)
- labels = db.labels_
- core_samples_mask = np.zeros_like(db.labels_, dtype = bool)
- core_samples_mask[db.core_sample_indices_] = True
- n_clusters_ = len(set(labels)) - (1 if -1 in labels else 0)
- n_noise_ = list(labels).count(-1)
- print('Estimated number of clusters: %d' % n_clusters_)
- print('Estimated number of noise points: %d' % n_noise_)
- if n_clusters_ == 0:
- print('can not get any clusters')
- plt.plot(x[:,0], x[:,1], 'o')
- plt.show()
- exit(0)
- print("Silhouette Coefficient: %0.3f"
- % metrics.silhouette_score(x, labels))
- unique_labels = set(labels)
- colors = [plt.cm.Spectral(each)
- for each in np.linspace(0, 1, len(unique_labels))]
- for k, col in zip(unique_labels, colors):
- if k == -1:
- # Black used for noise.
- col = [0, 0, 0, 1]
- class_member_mask = (labels == k)
- xy = x[class_member_mask & core_samples_mask]
- plt.plot(xy[:, 0], xy[:, 1], 'o', markerfacecolor=tuple(col),
- markeredgecolor='k', markersize=14)
- xy = x[class_member_mask & ~core_samples_mask]
- plt.plot(xy[:, 0], xy[:, 1], 'o', markerfacecolor=tuple(col),
- markeredgecolor='k', markersize=6)
- xy = x[class_member_mask]
- if k != -1:
- center, rad = cluster_statistic(xy)
- print("cluster %d:" % k)
- print("size =", len(xy))
- print("center = %f, %f" % (center[0], center[1]))
- print("radius = %f m" % rad)
- yaw = yaws[class_member_mask]
- db_yaw = DBSCAN(eps = 0.1, min_samples=100,
- metric = metric.ang_distance).fit(yaw.reshape(-1, 1))
- lbs = set(db_yaw.labels_)
- arrow_colors = [plt.cm.Spectral(each)
- for each in np.linspace(0, 1, len(lbs))]
- for l in lbs:
- if l != -1:
- mask = (db_yaw.labels_ == l)
- center, rad = cluster_statistic(xy[mask])
- print(" sub-cluster %d:" % l)
- print(" size =", np.sum(mask))
- print(" center = %f, %f" % (center[0], center[1]))
- print("radius = %f m" % rad)
- print(" avg yaw =",
- np.average(np.fmod(yaw[mask], 2*pi)) / pi * 180)
- kwargs = {'width': 1e-5}
- for i in range(len(xy)):
- col = arrow_colors[db_yaw.labels_[i]]
- if db_yaw.labels_[i] == -1:
- continue
- col = [0, 0, 0, 1]
- '''
- plt.arrow(xy[i, 0], xy[i, 1],
- 5e-5 * cos(pi/2 - yaw[i]),
- 5e-5 * sin(pi/2 - yaw[i]),
- **dict(width=1e-7, color=col))
- '''
- plt.annotate("", xytext=(xy[i, 0], xy[i, 1]),
- xy=(xy[i, 0] + 2e-5 * sin(pi/2 - yaw[i]),
- xy[i, 1] + 2e-5 * cos(pi/2 - yaw[i])),
- arrowprops=dict(arrowstyle="->",
- color=col))
- plt.show()
|