Source code for geograpy.locator

'''
The locator module allows to get detailed city 
information including the region and country of a city from a 
location string.

Examples for location strings are:

    Amsterdam, Netherlands
    Vienna, Austria
    Vienna, IL
    Paris - Texas
    Paris TX
    
the locator will lookup the cities and try to disambiguate the result based on the country or region information found.

The results in string representationa are:
    
    Amsterdam (NH(North Holland) - NL(Netherlands))
    Vienna (9(Vienna) - AT(Austria))
    Vienna (IL(Illinois) - US(United States))
    Paris (TX(Texas) - US(United States)) 
    Paris (TX(Texas) - US(United States))
    
Each city returned has a city.region and city.country attribute with the details of the city.
    

Created on 2020-09-18

@author: wf
'''
import os
import urllib
import re
import csv
import pycountry
import sys
import gzip
import shutil
from geograpy.wikidata import Wikidata
from lodstorage.sql import SQLDB
from geograpy.utils import remove_non_ascii
from geograpy import wikidata
from argparse import ArgumentParser
from argparse import RawDescriptionHelpFormatter

[docs]class City(object): ''' a single city as an object ''' def __init__(self): pass def __str__(self): text="%s (%s - %s)" % (self.name,self.region,self.country) return text
[docs] def setValue(self,name,record): ''' set a field value with the given name to the given record dicts corresponding entry or none Args: name(string): the name of the field record(dict): the dict to get the value from ''' if name in record: value=record[name] else: value=None self.__dict__[name]=value
[docs] @staticmethod def fromGeoLite2(record): city=City() city.name=record['name'] if not city.name: city.name=record['wikidataName'] city.setValue('population',record) city.setValue('gdp',record) city.region=Region.fromGeoLite2(record) city.country=Country.fromGeoLite2(record) return city
[docs]class Region(object): ''' a Region (Subdivision) ''' def __init__(self): pass def __str__(self): text="%s(%s)" % (self.iso,self.name) return text
[docs] @staticmethod def fromGeoLite2(record): ''' create a region from a Geolite2 record Args: record(dict): the records as returned from a Query Returns: Region: the corresponding region information ''' region=Region() region.name=record['regionName'] region.iso="%s-%s" % (record['countryIsoCode'],record['regionIsoCode']) return region
[docs] @staticmethod def fromWikidata(record): ''' create a region from a Wikidata record Args: record(dict): the records as returned from a Query Returns: Region: the corresponding region information ''' region=Region() region.name=record['regionLabel'] region.iso=record['regionIsoCode'] return region
[docs]class Country(object): ''' a country ''' def __init__(self): pass def __str__(self): text="%s(%s)" % (self.iso,self.name) return text
[docs] @staticmethod def fromGeoLite2(record): ''' create a country from a geolite2 record ''' country=Country() country.name=record['countryName'] country.iso=record['countryIsoCode'] return country
[docs] @staticmethod def fromPyCountry(pcountry): ''' Args: pcountry(PyCountry): a country as gotten from pycountry Returns: Country: the country ''' country=Country() country.name=pcountry.name country.iso=pcountry.alpha_2 return country
[docs]class Locator(object): ''' location handling ''' # singleton instance locator=None def __init__(self, db_file=None,correctMisspelling=False,debug=False): ''' Constructor Args: db_file(str): the path to the database file correctMispelling(bool): if True correct typical misspellings debug(bool): if True show debug information ''' self.debug=debug self.correctMisspelling=correctMisspelling self.db_path=os.path.dirname(os.path.realpath(__file__)) self.db_file = db_file or self.db_path+"/locs.db" self.view="GeoLite2CityLookup" self.sqlDB=SQLDB(self.db_file,errorDebug=True) self.getAliases() self.dbVersion="2020-09-27 16:48:09"
[docs] @staticmethod def resetInstance(): Locator.locator=None
[docs] @staticmethod def getInstance(correctMisspelling=False,debug=False): ''' get the singleton instance of the Locator. If parameters are changed on further calls the initial parameters will still be in effect since the original instance will be returned! Args: correctMispelling(bool): if True correct typical misspellings debug(bool): if True show debug information ''' if Locator.locator is None: Locator.locator=Locator(correctMisspelling=correctMisspelling,debug=debug) return Locator.locator
[docs] def locateCity(self,places): ''' locate a city, region country combination based on the given wordtoken information Args: places(list): a list of places derived by splitting a locality e.g. "San Francisco, CA" leads to "San Francisco", "CA" Returns: City: a city with country and region details ''' # make sure the database is populated self.populate_db() country=None cities=[] regions=[] # loop over all word elements for place in places: place=place.strip() if place in self.aliases: place=self.aliases[place] foundCountry=self.getCountry(place) if foundCountry is not None: country=foundCountry foundCities=self.cities_for_name(place) cities.extend(foundCities) foundRegions=self.regions_for_name(place) regions.extend(foundRegions) foundCity=self.disambiguate(country, regions, cities) return foundCity
[docs] def isISO(self,s): ''' check if the given string is an ISO code Returns: bool: True if the string is an ISO Code ''' m=re.search(r"^([A-Z]{1,2}\-)?[0-9A-Z]{1,3}$",s) result=m is not None return result
[docs] def disambiguate(self,country,regions,cities,byPopulation=True): ''' try determining country, regions and city from the potential choices Args: country(Country): a matching country found regions(list): a list of matching Regions found cities(list): a list of matching cities found Return: City: the found city or None ''' if self.debug: print("countries: %s " % country) print("regions: %s" % "\n\t".join(str(r) for r in regions)) print("cities: %s" % "\n\t".join(str(c) for c in cities)) foundCity=None # is the city information unique? if len(cities)==1: foundCity=cities[0] else: if len(cities)>1: if country is not None: for city in cities: if self.debug: print("city %s: " %(city)) if city.country.iso==country.iso: foundCity=city break if foundCity is None and len(regions)>0: for region in regions: for city in cities: if city.region.iso==region.iso and not city.region.name==city.name: foundCity=city break; if foundCity is not None: break if foundCity is None and byPopulation: foundCity=max(cities,key=lambda city:0 if city.population is None else city.population) pass return foundCity
[docs] def cities_for_name(self, cityName): ''' find cities with the given cityName Args: cityName(string): the potential name of a city Returns: a list of city records ''' cities=[] for column in ['name','wikidataName']: cityRecords=self.places_by_name(cityName, column) for cityRecord in cityRecords: cities.append(City.fromGeoLite2(cityRecord)) return cities
[docs] def regions_for_name(self, region_name): ''' get the regions for the given region_name (which might be an ISO code) Args: region_name(string): region name Returns: list: the list of cities for this region ''' regions=[] if self.isISO(region_name): columnName="regionIsoCode" else: columnName='regionLabel' query="SELECT * from regions WHERE %s = (?)" % (columnName) params=(region_name,) regionRecords=self.sqlDB.query(query,params) for regionRecord in regionRecords: regions.append(Region.fromWikidata(regionRecord)) return regions
[docs] def correct_country_misspelling(self, name): ''' correct potential misspellings Args: name(string): the name of the country potentially misspelled Return: string: correct name of unchanged ''' cur_dir = os.path.dirname(os.path.realpath(__file__)) with open(cur_dir + "/data/ISO3166ErrorDictionary.csv") as info: reader = csv.reader(info) for row in reader: if name in remove_non_ascii(row[0]): return row[2] return name
[docs] def is_a_country(self, name): ''' check if the given string name is a country Args: name(string): the string to check Returns: True: if pycountry thinks the string is a country ''' country=self.getCountry(name) result=country is not None return result
[docs] def getCountry(self,name): ''' get the country for the given name Args: name(string): the name of the country to lookup Returns: country: the country if one was found or None if not ''' if self.isISO(name): pcountry=pycountry.countries.get(alpha_2=name) else: if self.correctMisspelling: name = self.correct_country_misspelling(name) pcountry=pycountry.countries.get(name=name) country=None if pcountry is not None: country=Country.fromPyCountry(pcountry) #if country is None: # query="SELECT * FROM countries WHERE countryLabel = (?)""" # params=(name,) # countryRecords=self.sqlDB.query(query,params) # if len(countryRecords)>0: # pass return country
[docs] def getView(self): ''' get the view to be used Returns: str: the SQL view to be used for CityLookups e.g. GeoLite2CityLookup ''' view=self.view return view
[docs] def places_by_name(self, placeName, columnName): ''' get places by name and column Args: placeName(string): the name of the place columnName(string): the column to look at ''' if not self.db_has_data(): self.populate_db() view=self.getView() query='SELECT * FROM %s WHERE %s = (?)' % (view,columnName) params=(placeName,) cities=self.sqlDB.query(query,params) return cities
[docs] def getGeolite2Cities(self): ''' get the Geolite2 City-Locations as a list of Dicts Returns: list: a list of Geolite2 City-Locator dicts ''' cities=self.readCSV("GeoLite2-City-Locations-en.csv") return cities
[docs] def readCSV(self,fileName): records=[] cur_dir = os.path.dirname(os.path.realpath(__file__)) csvfile="%s/data/%s" % (cur_dir,fileName) with open(csvfile) as info: reader = csv.DictReader(info) for row in reader: records.append(row) return records
[docs] def recreateDatabase(self): ''' recreate my lookup database ''' print("recreating database ... %s" % self.db_file) self.populate_db(force=True)
[docs] def populate_db(self,force=False): ''' populate the cities SQL database which caches the information from the GeoLite2-City-Locations.csv file Args: force(bool): if True force a recreation of the database ''' hasData=self.db_has_data() if force: self.populate_Cities(self.sqlDB) self.populateFromWikidata(self.sqlDB) self.getWikidataCityPopulation(self.sqlDB) self.createViews(self.sqlDB) self.populate_Version(self.sqlDB) elif not hasData: url="http://wiki.bitplan.com/images/confident/locs.db.gz" zipped=self.db_file+".gz" print("Downloading %s from %s ... this might take a few seconds" % (zipped,url)) urllib.request.urlretrieve(url,zipped) print("unzipping %s from %s" % (self.db_file,zipped)) with gzip.open(zipped, 'rb') as gzipped: with open(self.db_file, 'wb') as unzipped: shutil.copyfileobj(gzipped, unzipped) if not os.path.isfile(self.db_file): raise("could not create lookup database %s" % self.db_file)
[docs] def populate_Version(self,sqlDB): ''' populate the version table Args: sqlDB(SQLDB): target SQL database ''' versionList=[{"version":self.dbVersion}] entityInfo=sqlDB.createTable(versionList,"Version","version",withDrop=True) sqlDB.store(versionList,entityInfo)
[docs] def getAliases(self): ''' get the aliases hashTable ''' aliases=self.readCSV("aliases.csv") self.aliases={} for alias in aliases: self.aliases[alias['name']]=alias['alias']
[docs] def populateFromWikidata(self,sqlDB): ''' populate countries and regions from Wikidata Args: sqlDB(SQLDB): target SQL database ''' self.populate_Countries(sqlDB) self.populate_Regions(sqlDB) return # ignore the following code as of 2020-09-26 self.populate_Cities_FromWikidata(sqlDB) viewDDLs=["DROP VIEW IF EXISTS WikidataCityLookup",""" CREATE VIEW WikidataCityLookup AS SELECT name AS name, regionLabel as regionName, regionIsoCode as regionIsoCode, countryLabel as countryName, countryIsoCode as countryIsoCode, cityPopulation as population, countryGDP_perCapita as gdp FROM City_wikidata """] # subdivision_1_name AS regionName, # subdivision_1_iso_code as regionIsoCode, # country_name AS countryName, # country_iso_code as countryIsoCode for viewDDL in viewDDLs: self.sqlDB.execute(viewDDL)
[docs] def populate_Countries(self,sqlDB): ''' populate database with countries from wikiData Args: sqlDB(SQLDB): target SQL database ''' print("retrieving Country data from wikidata ... (this might take a few seconds)") wikidata=Wikidata() wikidata.getCountries() entityInfo=sqlDB.createTable(wikidata.countryList,"countries",None,withDrop=True,sampleRecordCount=200) sqlDB.store(wikidata.countryList,entityInfo,fixNone=True)
[docs] def populate_Regions(self,sqlDB): ''' populate database with regions from wikiData Args: sqlDB(SQLDB): target SQL database ''' print("retrieving Region data from wikidata ... (this might take a minute)") wikidata=Wikidata() wikidata.getRegions() entityInfo=sqlDB.createTable(wikidata.regionList[:5000],"regions",primaryKey=None,withDrop=True) sqlDB.store(wikidata.regionList,entityInfo,fixNone=True)
[docs] def populate_Cities_FromWikidata(self,sqlDB): ''' populate the given sqlDB with the Wikidata Cities Args: sqlDB(SQLDB): target SQL database ''' dbFile=self.db_path+"/City_wikidata.db" if not os.path.exists(dbFile): print("Downloading %s ... this might take a few seconds" % dbFile) dbUrl="http://wiki.bitplan.com/images/confident/City_wikidata.db" urllib.request.urlretrieve(dbUrl,dbFile) wikiCitiesDB=SQLDB(dbFile) wikiCitiesDB.copyTo(sqlDB)
[docs] def getWikidataCityPopulation(self,sqlDB,endpoint=None): ''' Args: sqlDB(SQLDB): target SQL database endpoint(str): url of the wikidata endpoint or None if default should be used ''' dbFile=self.db_path+"/city_wikidata_population.db" rawTableName="cityPops" # is the wikidata population database available? if not os.path.exists(dbFile): # shall we created it from a wikidata query? if endpoint is not None: wikidata=Wikidata() wikidata.endpoint=endpoint cityList=wikidata.getCityPopulations() wikiCitiesDB=SQLDB(dbFile) entityInfo=wikiCitiesDB.createTable(cityList[:300],rawTableName,primaryKey=None,withDrop=True) wikiCitiesDB.store(cityList,entityInfo,fixNone=True) else: # just download a copy print("Downloading %s ... this might take a few seconds" % dbFile) dbUrl="http://wiki.bitplan.com/images/confident/city_wikidata_population.db" urllib.request.urlretrieve(dbUrl,dbFile) # (re) open the database wikiCitiesDB=SQLDB(dbFile) # check whether the table is populated tableList=sqlDB.getTableList() tableName="citiesWithPopulation" if self.db_recordCount(tableList, tableName)<10000: # check that database is writable # https://stackoverflow.com/a/44707371/1497139 sqlDB.execute("pragma user_version=0") # makes sure both tables are in target sqlDB wikiCitiesDB.copyTo(sqlDB) # create joined table sqlQuery=""" select geoname_id, city_name, cp.cityLabel, country_iso_code, country_name, subdivision_1_iso_code, subdivision_1_name, cp.city as wikidataurl, cp.cityPop from cities c join cityPops cp on c.geoname_id=cp.geoNameId union select geoNameId as geoname_id, null as city_name, cityLabel, countryIsoCode as country_iso_code, countryLabel as country_name, null as subdivision_1_iso_code, null as subdivision_1_name, city as wikidataurl, cityPop from cityPops where cityPop is not Null group by geoNameId order by cityPop desc """ cityList=sqlDB.query(sqlQuery) entityInfo=sqlDB.createTable(cityList,tableName,primaryKey=None,withDrop=True,sampleRecordCount=500) sqlDB.store(cityList,entityInfo,fixNone=True)
# remove raw Table #sqlCmd="DROP TABLE %s " %rawTableName #sqlDB.execute(sqlCmd)
[docs] def populate_Cities(self,sqlDB): ''' populate the given sqlDB with the Geolite2 Cities Args: sqlDB(SQLDB): the SQL database to use ''' cities=self.getGeolite2Cities() entityName="cities" primaryKey="geoname_id" entityInfo=sqlDB.createTable(cities[:100],entityName,primaryKey,withDrop=True) sqlDB.store(cities,entityInfo,executeMany=False)
[docs] def createViews(self,sqlDB): viewDDLs=["DROP VIEW IF EXISTS GeoLite2CityLookup",""" CREATE VIEW GeoLite2CityLookup AS SELECT city_name AS name, cityLabel AS wikidataName, wikidataurl, cityPop, subdivision_1_name AS regionName, subdivision_1_iso_code as regionIsoCode, country_name AS countryName, country_iso_code as countryIsoCode FROM citiesWithPopulation """] for viewDDL in viewDDLs: sqlDB.execute(viewDDL)
[docs] def db_recordCount(self,tableList,tableName): ''' count the number of records for the given tableName Args: tableList(list): the list of table to check tableName(str): the name of the table to check Returns int: the number of records found for the table ''' tableFound=False for table in tableList: if table['name']==tableName: tableFound=True break count=0 if tableFound: query="SELECT Count(*) AS count FROM %s" % tableName countResult=self.sqlDB.query(query) count=countResult[0]['count'] return count
[docs] def db_has_data(self): ''' check whether the database has data / is populated Returns: boolean: True if the cities table exists and has more than one record ''' tableList=self.sqlDB.getTableList() hasCities=self.db_recordCount(tableList,"citiesWithPopulation")>10000 hasCountries=self.db_recordCount(tableList,"countries")>100 hasRegions=self.db_recordCount(tableList,"regions")>1000 hasVersion=self.db_recordCount(tableList,"Version")==1 versionOk=False if hasVersion: query="SELECT version from Version" dbVersionList=self.sqlDB.query(query) versionOk=dbVersionList[0]['version']==self.dbVersion #hasWikidataCities=self.db_recordCount(tableList,'City_wikidata')>100000 ok=hasVersion and versionOk and hasCities and hasRegions and hasCountries return ok
__version__ = '0.1.15' __date__ = '2020-09-26' __updated__ = '2020-09-26' DEBUG = 1
[docs]def main(argv=None): # IGNORE:C0111 '''main program.''' if argv is None: argv = sys.argv else: sys.argv.extend(argv) program_name = os.path.basename(sys.argv[0]) program_version = "v%s" % __version__ program_build_date = str(__updated__) program_version_message = '%%(prog)s %s (%s)' % (program_version, program_build_date) program_shortdesc = __import__('__main__').__doc__.split("\n")[1] user_name="Wolfgang Fahl" program_license = '''%s Created by %s on %s. Copyright 2020 Wolfgang Fahl. All rights reserved. Licensed under the Apache License 2.0 http://www.apache.org/licenses/LICENSE-2.0 Distributed on an "AS IS" basis without warranties or conditions of any kind, either express or implied. USAGE ''' % (program_shortdesc,user_name, str(__date__)) try: # Setup argument parser parser = ArgumentParser(description=program_license, formatter_class=RawDescriptionHelpFormatter) parser.add_argument("-d", "--debug", dest="debug", action="store_true", help="if True show debug information") parser.add_argument("-cm", "--correctSpelling", dest="correctMisspelling", action="store_true", help="if True correct typical misspellings") parser.add_argument("-db", "--recreateDatabase", dest='recreateDatabase',action="store_true", help="recreate the database") parser.add_argument('-V', '--version', action='version', version=program_version_message) # Process arguments args = parser.parse_args() loc=Locator.getInstance(correctMisspelling=args.correctMisspelling,debug=args.debug) if args.recreateDatabase: loc.recreateDatabase() else: print ("no other functionality yet ...") except KeyboardInterrupt: ### handle keyboard interrupt ### return 1 except Exception as e: if DEBUG: raise(e) indent = len(program_name) * " " sys.stderr.write(program_name + ": " + repr(e) + "\n") sys.stderr.write(indent + " for help use --help") return 2
if __name__ == "__main__": if DEBUG: sys.argv.append("-d") sys.exit(main())