| | import torch |
| | from transformers import AutoTokenizer,AutoModelForTokenClassification |
| | from transformers import GeoLMModel |
| | import requests |
| | import numpy as np |
| | import pandas as pd |
| | import scipy.spatial as sp |
| | import streamlit as st |
| | import folium |
| | from streamlit.components.v1 import html |
| |
|
| |
|
| | from haversine import haversine, Unit |
| |
|
| |
|
| | dataset=None |
| |
|
| |
|
| |
|
| | def generate_human_readable(tokens,labels): |
| | ret = [] |
| | for t,lab in zip(tokens,labels): |
| | if t == '[SEP]': |
| | continue |
| |
|
| | if t.startswith("##") : |
| | assert len(ret) > 0 |
| | ret[-1] = ret[-1] + t.strip('##') |
| |
|
| | elif lab==2: |
| | assert len(ret) > 0 |
| | ret[-1] = ret[-1] + " "+ t.strip('##') |
| | else: |
| | ret.append(t) |
| |
|
| | return ret |
| |
|
| | def getSlice(tensor): |
| | result = [] |
| | curr = [] |
| | for index, value in enumerate(tensor[0]): |
| | if value == 1 or value == 2: |
| | curr.append(index) |
| |
|
| | if value == 0 and curr != []: |
| | result.append(curr) |
| | curr = [] |
| |
|
| | return result |
| |
|
| | def getIndex(input): |
| |
|
| |
|
| | tokenizer, model= getModel1() |
| |
|
| | |
| | tokens = tokenizer.encode(input, return_tensors="pt") |
| |
|
| |
|
| | |
| | outputs = model(tokens) |
| |
|
| |
|
| | |
| | predicted_labels = torch.argmax(outputs.logits, dim=2) |
| |
|
| | predicted_labels = predicted_labels.detach().cpu().numpy() |
| |
|
| | |
| |
|
| | predicted_labels = [model.config.id2label[label] for label in predicted_labels[0]] |
| | |
| |
|
| | predicted_labels = torch.argmax(outputs.logits, dim=2) |
| |
|
| | |
| |
|
| | query_tokens = tokens[0][torch.where(predicted_labels[0] != 0)[0]] |
| |
|
| | query_labels = predicted_labels[0][torch.where(predicted_labels[0] != 0)[0]] |
| |
|
| | print(predicted_labels) |
| | print(predicted_labels.shape) |
| |
|
| | slices=getSlice(predicted_labels) |
| |
|
| |
|
| | |
| |
|
| |
|
| | return slices |
| |
|
| | def cutSlices(tensor, slicesList): |
| |
|
| | locationTensor= torch.zeros(1, len(slicesList), 768) |
| |
|
| | curr=0 |
| | for slice in slicesList: |
| |
|
| | if len(slice)==1: |
| | locationTensor[0][curr] = tensor[0][slice[0]] |
| | curr=curr+1 |
| | if len(slice)>1 : |
| |
|
| | sliceTensor=tensor[0][slice[0]:slice[-1]+1] |
| | |
| | sliceTensor = sliceTensor.unsqueeze(0) |
| |
|
| | mean = torch.mean(sliceTensor,dim=1,keepdim=True) |
| |
|
| | locationTensor[0][curr] = mean[0] |
| |
|
| | curr=curr+1 |
| |
|
| |
|
| | return locationTensor |
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def MLearningFormInput(input): |
| |
|
| |
|
| | tokenizer,model=getModel2() |
| |
|
| | tokens = tokenizer.encode(input, return_tensors="pt") |
| |
|
| | |
| | |
| |
|
| |
|
| | outputs = model(tokens, spatial_position_list_x=torch.zeros(tokens.shape), spatial_position_list_y=torch.zeros(tokens.shape)) |
| |
|
| |
|
| | |
| |
|
| | |
| |
|
| |
|
| | slicesIndex=getIndex(input) |
| |
|
| | |
| |
|
| | |
| | res= cutSlices(outputs.last_hidden_state, slicesIndex) |
| |
|
| |
|
| | return res |
| |
|
| |
|
| |
|
| |
|
| |
|
| | def generate_human_readable(tokens,labels): |
| | ret = [] |
| | for t,lab in zip(tokens,labels): |
| | if t == '[SEP]': |
| | continue |
| |
|
| | if t.startswith("##") : |
| | assert len(ret) > 0 |
| | ret[-1] = ret[-1] + t.strip('##') |
| |
|
| | elif lab==2: |
| | assert len(ret) > 0 |
| | ret[-1] = ret[-1] + " "+ t.strip('##') |
| | else: |
| | ret.append(t) |
| |
|
| | return ret |
| |
|
| |
|
| | def getLocationName(input_sentence): |
| | |
| | tokenizer, model= getModel1() |
| |
|
| |
|
| | |
| | tokens = tokenizer.encode(input_sentence, return_tensors="pt") |
| |
|
| |
|
| | |
| | outputs = model(tokens) |
| |
|
| |
|
| | |
| | predicted_labels = torch.argmax(outputs.logits, dim=2) |
| |
|
| | predicted_labels = predicted_labels.detach().cpu().numpy() |
| |
|
| | |
| |
|
| | predicted_labels = [model.config.id2label[label] for label in predicted_labels[0]] |
| |
|
| | predicted_labels = torch.argmax(outputs.logits, dim=2) |
| |
|
| | query_tokens = tokens[0][torch.where(predicted_labels[0] != 0)[0]] |
| |
|
| | query_labels = predicted_labels[0][torch.where(predicted_labels[0] != 0)[0]] |
| |
|
| |
|
| | human_readable = generate_human_readable(tokenizer.convert_ids_to_tokens(query_tokens), query_labels) |
| |
|
| | return human_readable |
| |
|
| |
|
| |
|
| | def search_geonames(toponym, df): |
| | |
| | api_endpoint = "http://api.geonames.org/searchJSON" |
| |
|
| | username = "zekun" |
| |
|
| | print(toponym) |
| |
|
| | params = { |
| | 'q': toponym, |
| | 'username': username, |
| | 'maxRows':10 |
| | } |
| |
|
| | response = requests.get(api_endpoint, params=params) |
| | data = response.json() |
| |
|
| | result = [] |
| |
|
| | lat=[] |
| | lon=[] |
| |
|
| | if 'geonames' in data: |
| | for place_info in data['geonames']: |
| | latitude = float(place_info.get('lat', 0.0)) |
| | longitude = float(place_info.get('lng', 0.0)) |
| |
|
| | lat.append(latitude) |
| | lon.append(longitude) |
| |
|
| | print(latitude) |
| | print(longitude) |
| |
|
| | |
| |
|
| | id = place_info.get('geonameId', '') |
| |
|
| | print(id) |
| |
|
| | global dataset |
| | res = get50Neigbors(id, dataset, k=50) |
| | result.append(res) |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | print(res) |
| |
|
| |
|
| | df['lat'] = lat |
| | df['lon'] = lon |
| | result = torch.cat(result, dim=1).detach().numpy() |
| | return result |
| |
|
| |
|
| |
|
| | def get50Neigbors(locationID, dataset, k=50): |
| |
|
| | print("neighbor part----------------------------------------------------------------") |
| |
|
| | input_row = dataset.loc[dataset['GeonameID'] == locationID].iloc[0] |
| |
|
| |
|
| | lat, lon, geohash,name = input_row['Latitude'], input_row['Longitude'], input_row['Geohash'], input_row['Name'] |
| |
|
| | filtered_dataset = dataset.loc[dataset['Geohash'].str.startswith(geohash[:7])].copy() |
| |
|
| | filtered_dataset['distance'] = filtered_dataset.apply( |
| | lambda row: haversine((lat, lon), (row['Latitude'], row['Longitude']), Unit.KILOMETERS), |
| | axis=1 |
| | ).copy() |
| |
|
| |
|
| | print("neighbor end----------------------------------------------------------------") |
| |
|
| |
|
| |
|
| | filtered_dataset = filtered_dataset.sort_values(by='distance') |
| |
|
| |
|
| |
|
| | nearest_neighbors = filtered_dataset.head(k)[['Name']] |
| |
|
| |
|
| | neighbors=nearest_neighbors.values.tolist() |
| |
|
| |
|
| | tokenizer, model= getModel1_0() |
| |
|
| |
|
| | sep_token_id = tokenizer.convert_tokens_to_ids(tokenizer.sep_token) |
| | cls_token_id = tokenizer.convert_tokens_to_ids(tokenizer.cls_token) |
| |
|
| |
|
| | neighbor_token_list = [] |
| | neighbor_token_list.append(cls_token_id) |
| |
|
| | target_token=tokenizer.convert_tokens_to_ids(tokenizer.tokenize(name)) |
| |
|
| |
|
| |
|
| | for neighbor in neighbors: |
| |
|
| |
|
| | neighbor_token = tokenizer.convert_tokens_to_ids(tokenizer.tokenize(neighbor[0])) |
| | neighbor_token_list.extend(neighbor_token) |
| | neighbor_token_list.append(sep_token_id) |
| |
|
| |
|
| | |
| |
|
| | |
| |
|
| |
|
| | tokens = torch.Tensor(neighbor_token_list).unsqueeze(0).long() |
| |
|
| |
|
| | |
| | outputs = model(tokens, spatial_position_list_x=torch.zeros(tokens.shape), spatial_position_list_y=torch.zeros(tokens.shape)) |
| |
|
| |
|
| |
|
| | |
| |
|
| | |
| |
|
| |
|
| | targetIndex=list(range(1, len(target_token)+1)) |
| |
|
| | |
| | |
| | res=cutSlices(outputs.last_hidden_state, [targetIndex]) |
| |
|
| |
|
| |
|
| |
|
| |
|
| | return res |
| |
|
| |
|
| |
|
| | def cosine_similarity(target_feature, candidate_feature): |
| |
|
| | target_feature = target_feature.squeeze() |
| | candidate_feature = candidate_feature.squeeze() |
| |
|
| | dot_product = torch.dot(target_feature, candidate_feature) |
| | |
| | target = torch.norm(target_feature) |
| | candidate = torch.norm(candidate_feature) |
| | |
| | similarity = dot_product / (target * candidate) |
| | |
| | return similarity.item() |
| |
|
| |
|
| | @st.cache_data |
| |
|
| | def getCSV(): |
| | dataset = pd.read_csv('geohash.csv') |
| | return dataset |
| |
|
| | @st.cache_data |
| |
|
| | def getModel1(): |
| | |
| | model_name = "zekun-li/geolm-base-toponym-recognition" |
| |
|
| | |
| | tokenizer = AutoTokenizer.from_pretrained(model_name) |
| | model = AutoModelForTokenClassification.from_pretrained(model_name) |
| |
|
| | return tokenizer,model |
| |
|
| | def getModel1_0(): |
| | |
| | model_name = "zekun-li/geolm-base-toponym-recognition" |
| |
|
| | |
| | tokenizer = AutoTokenizer.from_pretrained(model_name) |
| | model = GeoLMModel.from_pretrained(model_name) |
| | return tokenizer,model |
| |
|
| |
|
| |
|
| | def getModel2(): |
| |
|
| | model_name = "zekun-li/geolm-base-cased" |
| |
|
| | tokenizer = AutoTokenizer.from_pretrained(model_name) |
| |
|
| | model = GeoLMModel.from_pretrained(model_name) |
| |
|
| | return tokenizer,model |
| | |
| |
|
| | def showing(df): |
| |
|
| | m = folium.Map(location=[df['lat'].mean(), df['lon'].mean()], zoom_start=5) |
| |
|
| | size_scale = 100 |
| | color_scale = 255 |
| | for i in range(len(df)): |
| | lat, lon, prob = df.iloc[i]['lat'], df.iloc[i]['lon'], df.iloc[i]['prob'] |
| | |
| | size = int(prob**2 * size_scale ) |
| | color = int(prob**2 * color_scale) |
| | |
| | folium.CircleMarker( |
| | location=[lat, lon], |
| | radius=size, |
| | color=f'#{color:02X}0000', |
| | fill=True, |
| | fill_color=f'#{color:02X}0000' |
| | ).add_to(m) |
| |
|
| | m.save("map.html") |
| |
|
| | with open("map.html", "r", encoding="utf-8") as f: |
| | map_html = f.read() |
| |
|
| | st.components.v1.html(map_html, height=600) |
| |
|
| |
|
| | def mapping(selected_place,locations, sentence_info): |
| | location_index = locations.index(selected_place) |
| | print(location_index) |
| |
|
| | df = pd.DataFrame() |
| |
|
| | |
| | same_name_embedding=search_geonames(selected_place, df) |
| |
|
| |
|
| | sim_matrix=[] |
| | print(sim_matrix) |
| |
|
| |
|
| | same_name_embedding=torch.tensor(same_name_embedding) |
| | |
| | for i in range(same_name_embedding.size(1)): |
| | print((sentence_info[:, location_index, :]).shape) |
| | print((same_name_embedding[:, i, :]).shape) |
| |
|
| | similarities = cosine_similarity(sentence_info[:, location_index, :], same_name_embedding[:, i, :]) |
| | sim_matrix.append(similarities) |
| |
|
| | |
| | |
| | |
| | def sigmoid(x): |
| | return 1 / (1 + np.exp(-x)) |
| |
|
| | prob_matrix = sigmoid(np.array(sim_matrix)) |
| |
|
| | |
| | df['prob'] = prob_matrix |
| |
|
| |
|
| | print(df) |
| |
|
| | showing(df) |
| |
|
| |
|
| |
|
| | def show_on_map(): |
| |
|
| |
|
| |
|
| | input = st.text_area("Enter a sentence:", height=200) |
| |
|
| | st.button("Submit") |
| |
|
| | sentence_info= MLearningFormInput(input) |
| |
|
| | print("sentence info: ") |
| | print(sentence_info) |
| | print(sentence_info.shape) |
| |
|
| |
|
| | |
| | locations=getLocationName(input) |
| |
|
| | |
| | selected_place = st.selectbox("Select a location:", locations) |
| | |
| | if selected_place is not None: |
| |
|
| | mapping(selected_place, locations, sentence_info) |
| |
|
| |
|
| |
|
| |
|
| | if __name__ == "__main__": |
| |
|
| |
|
| | dataset = getCSV() |
| |
|
| | show_on_map() |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| |
|
| | |
| | |
| |
|
| | |
| |
|
| | |
| |
|
| |
|
| | |
| |
|
| | |
| |
|
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| |
|
| |
|
| |
|
| |
|
| |
|
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| |
|
| |
|
| |
|
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| |
|
| | |
| | |
| |
|