import json
import urllib2
from directories import userCachePath
import os
import time
from PIL import Image
import atexit
import threading
import logging
from uuid import UUID
import httplib
import base64
import datetime
log = logging.getLogger(__name__)
#@Singleton
[docs]class PlayerCache:
'''
Used to cache Player names and UUID's, provides an small API to interface with it
'''
PATH = userCachePath
TIMEOUT = 2.5
__shared_state = {}
def __init__(self):
self.__dict__ = self.__shared_state
self.last_error = None
self.error_count = 0
# --- Utility Functions ---
@staticmethod
[docs] def insertSeperators(uuid):
return uuid[:8] + "-" + uuid[8:12] + "-" + uuid[12:16] + "-" + uuid[16:20] + "-" + uuid[20:]
@staticmethod
[docs] def getDeltaTime(timestamp, unit):
t = time.time()
old = datetime.datetime.fromtimestamp(timestamp)
current = datetime.datetime.fromtimestamp(t)
delta = current - old
return getattr(delta, unit, "hours")
def __convert(self, json_in):
for player in json_in:
new_dict = {}
new_dict["Name"] = player["Playername"]
new_dict["Timestamp"] = player["Timestamp"]
new_dict["Successful"] = player["WasSuccessful"]
self._cache["Cache"][player["UUID (No Separator)"]] = new_dict
[docs] def save(self):
if hasattr(self, "_cache"):
fp = open(self.PATH, 'w')
json.dump(self._cache, fp, indent=4, separators=(',', ':'))
fp.close()
[docs] def load(self):
'''
Loads from the usercache.json file if it exists, if not an empty one will be generated
'''
self._cache = {"Version": 2, "Connection Timeout": 2.5, "Cache": {}}
if not os.path.exists(self.PATH):
fp = open(self.PATH, 'w')
json.dump(self._cache, fp)
fp.close()
fp = open(self.PATH, 'r')
try:
json_in = json.load(fp)
if "Version" not in json_in or json_in.get("Version", 0) != 2:
self.__convert(json_in)
else:
self._cache = json_in
except:
log.warning("Usercache.json may be corrupted")
finally:
fp.close()
self.temp_skin_cache = {}
self.TIMEOUT = self._cache.get("Connection Timeout", 2.5)
self.cache_lock = threading.RLock()
self.player_refeshing = threading.Thread(target=self._batchRefreshPlayers)
#self.player_refeshing.daemon(True) # No idea whether to use the property setter function or the attribute, so I'll use both
self.player_refeshing.daemon = True
self.player_refeshing.start()
# --- Refreshing ---
def _batchRefreshPlayers(self):
to_refresh_successful = []
to_refresh_failed = []
to_refresh = []
with self.cache_lock:
# TODO: Put this into a thread, since it could take alot of time to run
# TODO: Handle entries that weren't successful last time the cache was modified
for uuid in self._cache["Cache"].keys():
player = self._cache["Cache"][uuid]
if player["Successful"]:
if self.getDeltaTime(player["Timestamp"], "hours") > 6:
to_refresh_successful.append(uuid)
else:
to_refresh_failed.append(uuid)
to_refresh = to_refresh_successful + to_refresh_failed
for uuid in to_refresh:
if self.last_error and self.error_count >= 4:
break
elif self.last_error:
self.error_count += 1
self._getPlayerInfoUUID(uuid)
self.save()
[docs] def force_refresh(self):
for uuid in self._cache["Cache"].keys():
self.getPlayerInfo(uuid, force=True)
self.save()
# --- Checking if supplied data is in the Cache ---
[docs] def UUIDInCache(self, uuid):
'''
Checks to see if the UUID is already in the cache
:param uuid: The UUID of the player
:type uuid: str
:rtype: bool
'''
return uuid.replace("-", "") in self._cache["Cache"]
[docs] def nameInCache(self, name):
'''
Checks to see if the name is already in the cache
:param name: The name of the player
:type name: str
:rtype: bool
'''
for uuid in self._cache["Cache"].keys():
if self._cache["Cache"][uuid].get("Name", "") == name:
return True
return False
# --- Getting data from the Cache ---
def _getDataFromCacheUUID(self, uuid):
'''
Checks if the UUID is already in the cache
:param uuid: The UUID that might be in the cache
:type uuid: str
:return: The player data that is in the cache for the specified UUID, same format as getPlayerInfo()
:rtype: tuple
'''
clean_uuid = uuid.replace("-","")
player = self._cache["Cache"].get(clean_uuid, {})
return (self.insertSeperators(clean_uuid), player.get("Name", "<Unknown Name>"), clean_uuid)
def _getDataFromCacheName(self, name):
'''
Checks if the Player name is already in the cache
:param name: The name of the Player that might be in the cache
:return: The player data that is in the cache for the specified Player name, same format as getPlayerInfo()
:rtype: tuple
'''
for uuid in self._cache["Cache"].keys():
clean_uuid = uuid.replace("-","")
player = self._cache["Cache"][uuid]
if player.get("Name", "") == name and player.get("Successful", False):
return (self.insertSeperators(clean_uuid), player["Name"], clean_uuid)
return ("<Unknown UUID>", name, "<Unknown UUID>")
def _wasSuccessfulUUID(self, uuid):
'''
Returns whether retrieving the player data was Successful
:param uuid: The UUID of the player to check
:return: True if the last time the player data retrieval from Mojang's API was successful, False otherwise
:rtype: bool
'''
clean_uuid = uuid.replace("-","")
player = self._cache["Cache"].get(clean_uuid, {})
return player.get("Successful", False)
def _wasSuccessfulName(self, name):
'''
Returns whether retrieving the player data was Successful
:param name: The name of the player to check
:return: True if the last time the player data retrieval from Mojang's API was successful, False otherwise
:rtype: bool
'''
for uuid in self._cache["Cache"].keys():
player = self._cache["Cache"][uuid]
if player.get("Name", "") == name:
return player.get("Successful", False)
return False
[docs] def getPlayerInfo(self, arg, force=False, use_old_data=False):
'''
Recommended method to call to get Player data. Roughly determines whether a UUID or Player name was passed in 'arg'
:param arg: Either a UUID or Player name to retrieve from the cache/Mojang's API
:type arg: str
:param force: True if the Player name should be forcefully fetched from Mojang's API
:type force: bool
:return: A tuple with the data in this order: (UUID with separator, Player name, UUID without separator)
:rtype: tuple
'''
try:
UUID(arg, version=4)
if self.UUIDInCache(arg) and self._wasSuccessfulUUID(arg) and not force:
return self._getDataFromCacheUUID(arg)
else:
return self._getPlayerInfoUUID(arg, use_old_data)
except ValueError:
if self.nameInCache(arg) and self._wasSuccessfulName(arg) and not force:
return self._getDataFromCacheName(arg)
else:
return self._getPlayerInfoName(arg)
# --- Player Data Getters ---
def _getPlayerInfoUUID(self, uuid, use_old_data=False):
clean_uuid = uuid.replace("-","")
player = self._cache["Cache"].get(clean_uuid, {})
response = self._getDataFromURL("https://sessionserver.mojang.com/session/minecraft/profile/{}".format(clean_uuid))
if response:
try:
data = response
response = json.loads(response)
player["Name"] = response.get("name", player.get("Name", "<Unknown Name>"))
player["Timestamp"] = time.time()
player["Successful"] = True
self._cache["Cache"][clean_uuid] = player
self.temp_skin_cache[clean_uuid] = data
self.save()
return (self.insertSeperators(clean_uuid), player["Name"], clean_uuid)
except:
player["Successful"] = False
self._cache["Cache"][clean_uuid] = player
if use_old_data and player.get("Name", "<Unknown Name>") != "<Unknown Name>":
return (self.insertSeperators(clean_uuid), player["Name"], clean_uuid)
else:
return (self.insertSeperators(clean_uuid), "<Unknown Name>", clean_uuid)
else:
player["Successful"] = False
self._cache["Cache"][clean_uuid] = player
if use_old_data and player.get("Name", "<Unknown Name>") != "<Unknown Name>":
return (self.insertSeperators(clean_uuid), player["Name"], clean_uuid)
else:
return (self.insertSeperators(clean_uuid), "<Unknown Name>", clean_uuid)
def _getPlayerInfoName(self, name):
response = self._getDataFromURL("https://api.mojang.com/users/profiles/minecraft/{}".format(name))
if response:
try:
response = json.loads(response)
uuid = response["id"]
player = self._cache["Cache"].get(uuid,{})
player["Name"] = response.get("name", player.get("Name", "<Unknown Name>"))
player["Timestamp"] = time.time()
player["Successful"] = True
self._cache["Cache"][uuid] = player
self.save()
return (self.insertSeperators(uuid), player["Name"], uuid)
except:
return ("<Unknown UUID>", name, "<Unknown UUID>")
else:
return ("<Unknown UUID>", name, "<Unknown UUID>")
# --- Skin Getting ---
def _parseSkinResponse(self, response):
try:
resp = json.loads(response)
decoded = base64.b64decode(resp["properties"][0]["value"])
resp = json.loads(decoded)
if "SKIN" in resp["textures"]:
resp = self._getDataFromURL(resp["textures"]["SKIN"]["url"])
return resp
except:
import traceback
print "Couldn't parse skin response JSON"
print traceback.format_exc()
return None
[docs] def getPlayerSkin(self, arg, force_download=True, instance=None):
'''
Gets the player's skin from Mojang's skin servers
:param uuid: The UUID of the player
:type uuid: str
:param force_download: Should the skin be re-downloaded even if it has already been downloaded
:type force: bool
:param instance: The instance of the PlayerTool
:type instance: PlayerTool
:return: The path to the player skin
:rtype: str
'''
toReturn = 'char.png'
uuid_sep, name, uuid = self.getPlayerInfo(arg)
if uuid == "<Unknown UUID>":
return toReturn
player = self._cache["Cache"][uuid]
skin_path = os.path.join("player-skins", uuid_sep.replace("-","_") + ".png")
#temp_skin_path = os.path.join("player-skin", uuid_sep.replace("-","_") + ".temp.png")
try:
if not force_download and os.path.exists(skin_path):
skin = Image.open(skin_path)
if skin.size == (64,64):
skin = skin.crop((0,0,64,32))
skin.save(skin_path)
toReturn = skin_path
elif force_download or not os.path.exists(skin_path):
if uuid in self.temp_skin_cache:
parsed = self._parseSkinResponse(self.temp_skin_cache[uuid])
if parsed is not None:
self._saveSkin(uuid, parsed)
toReturn = skin_path
player["Skin"] = { "Timestamp": time.time() }
self._cache["Cache"][uuid] = player
del self.temp_skin_cache[uuid]
self.save()
else:
response = self._getDataFromURL("https://sessionserver.mojang.com/session/minecraft/profile/{}".format(uuid))
if response is not None:
parsed = self._parseSkinResponse(response)
if parsed is not None:
self._saveSkin(uuid, parsed)
toReturn = skin_path
player["Skin"] = { "Timestamp": time.time() }
self._cache["Cache"][uuid] = player
self.save()
except IOError:
print "Couldn't find Image file ("+skin_path+") or the file may be corrupted"
if instance is not None:
instance.delete_skin(uuid_sep.replace("-","_"))
os.remove(skin_path)
print "Something happened, retrying"
toReturn = self.getPlayerSkin(arg, True, instance)
except Exception:
import traceback
print "Unknown error occurred while reading/downloading skin for "+str(uuid.replace("-","_")+".png")
print traceback.format_exc()
return toReturn
def _saveSkin(self, uuid, data):
if "-" not in uuid:
uuid = self.insertSeperators(uuid)
try:
os.mkdir("player-skins")
except OSError:
pass
skin_path = os.path.join("player-skins", uuid.replace("-","_") + ".png")
with open(skin_path, 'wb') as fp:
fp.write(data)
skin = Image.open(skin_path)
if skin.size == (64,64):
skin = skin.crop((0,0,64,32))
skin.save(skin_path)
def _getDataFromURL(self, url):
import traceback
try:
response = urllib2.urlopen(url, timeout=self.TIMEOUT).read()
self.last_error = False
return response
except urllib2.HTTPError, e:
log.warn("Encountered a HTTPError while trying to access \"" + url + "\"")
log.warn("Error: " + str(e.code))
self.last_error = (e.code == 429)
except urllib2.URLError, e:
log.warn("Encountered an URLError while trying to access \"" + url + "\"")
log.warn("Error: " + str(e.reason))
self.last_error = (str(e.reason) == '_ssl.c:489: The handshake operation timed out')
except httplib.HTTPException:
log.warn("Encountered a HTTPException while trying to access \"" + url + "\"")
except Exception:
log.warn("Unknown error occurred while trying to get data from URL: " + url)
log.warn(traceback.format_exc())
return None
def _postDataToURL(self, url, payload, headers):
import traceback
try:
request = urllib2.Request(url, payload, headers)
response = urllib2.urlopen(request, timeout=self.TIMEOUT).read()
return response
except urllib2.HTTPError, e:
log.warn("Encountered a HTTPError while trying to POST to \"" + url + "\"")
log.warn("Error: " + str(e.code))
except urllib2.URLError, e:
log.warn("Encountered an URLError while trying to POST to \"" + url + "\"")
log.warn("Error: " + str(e.reason))
except httplib.HTTPException:
log.warn("Encountered a HTTPException while trying to POST to \"" + url + "\"")
except Exception:
log.warn("Unknown error occurred while trying to POST data to URL: " + url)
log.warn(traceback.format_exc())
return None
def _cleanup():
if os.path.exists("player-skins"):
for image_file in os.listdir("player-skins"):
fp = None
try:
fp = open(os.path.join("player-skins", image_file), 'rb')
im = Image.open(fp)
if hasattr(im, 'close'):
im.close()
except IOError:
os.remove(os.path.join("player-skins", image_file))
except AttributeError:
pass # I have no idea why an Attribute Error is thrown on .close(), but this fixes it
finally:
if fp and not fp.closed:
fp.close()
atexit.register(_cleanup)
atexit.register(PlayerCache().save)