I am processing some CSV files in databricks. Most of them are quite small(up to 500 MB).However, some of them have UTF-16 encoding, so I encode them into UTF-8 before reading with Spark. Other thing is that there are some characters that Spark cannot parse, hence the line.replace('":"', ':')
.
def check_file_encoding(input_file_path: str) -> list:
if not isinstance(input_file_path, str):
raise ValueError('Input file path provided is not of type string.')
if '/dbfs' not in input_file_path[:5]:
if input_file_path[0] == '/':
input_file_path = os.path.join('/dbfs', input_file_path[1:])
else:
input_file_path = os.path.join('/dbfs', input_file_path)
with open(input_file_path, 'rb') as raw_data:
result = chardet.detect(raw_data.read())
if result['encoding'] == 'UTF-8-SIG':
result['encoding'] = 'UTF-8'
return result
def convert_utf16_to_utf8(input_file_path):
if check_file_encoding(input_file_path)['encoding'] == "UTF-16":
if input_file_path[:6] != "/dbfs/":
input_file_path = os.path.join("/dbfs", input_file_path[1:])
output_file_path = input_file_path.replace(".CSV", "_UTF8.CSV").replace(".csv", "_UTF8.csv")
with codecs.open(input_file_path, encoding="utf-16") as input_file:
with codecs.open(output_file_path, "w", encoding="utf-8") as output_file:
shutil.copyfileobj(input_file, output_file)
with open(output_file_path, 'r') as input_file, open(input_file_path, 'w') as output_file:
for line in input_file:
output_file.write(line.replace('":"', ':'))
The problem is that I have started to get [Errno 22] Invalid argument
on the line output_file.write(line.replace('":"', ':'))
. I have tried to reproduce it but the same set of data can go through successfully and then crash during a different run. I was thinking that maybe it has something to do with memory limitations of the with open(output_file_path, 'r')
but running it concurrently didn't reveal anything. File-path is always valid(no additional characters that could break it).
At this moment I would be thankful if you could take a look at this code and tell me what could be potentially wrong. I am aware that this is not the best approach databricks/spark-wise, so any suggestion on how to refactor it in order to be more robust is also welcomed.
I was thinking about something along the lines:
rdd = sc.textFile(file_path, use_unicode=False).map(lambda x: x.decode("UTF-16")).map(lambda x: x.encode("UTF-8")).map(lambda x: x.replace('":"', ':'))
But then I still need to find a way of checking the encoding of the files, haven't really found any spark-ish way of doing that so far.