Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I am processing some CSV files in databricks. Most of them are quite small(up to 500 MB).However, some of them have UTF-16 encoding, so I encode them into UTF-8 before reading with Spark. Other thing is that there are some characters that Spark cannot parse, hence the line.replace('":"', ':').

def check_file_encoding(input_file_path: str) -> list:
  if not isinstance(input_file_path, str):
      raise ValueError('Input file path provided is not of type string.')

  if '/dbfs' not in input_file_path[:5]:
      if input_file_path[0] == '/':
          input_file_path = os.path.join('/dbfs', input_file_path[1:])
      else:
          input_file_path = os.path.join('/dbfs', input_file_path)

  with open(input_file_path, 'rb') as raw_data:
      result = chardet.detect(raw_data.read())

  if result['encoding'] == 'UTF-8-SIG':
    result['encoding'] = 'UTF-8'

  return result


def convert_utf16_to_utf8(input_file_path):
    if check_file_encoding(input_file_path)['encoding'] == "UTF-16":
        if input_file_path[:6] != "/dbfs/":
          input_file_path = os.path.join("/dbfs", input_file_path[1:])

        output_file_path = input_file_path.replace(".CSV", "_UTF8.CSV").replace(".csv", "_UTF8.csv")

        with codecs.open(input_file_path, encoding="utf-16") as input_file:
          with codecs.open(output_file_path, "w", encoding="utf-8") as output_file:
              shutil.copyfileobj(input_file, output_file)

        with open(output_file_path, 'r') as input_file, open(input_file_path, 'w') as output_file:
          for line in input_file:
            output_file.write(line.replace('":"', ':'))

The problem is that I have started to get [Errno 22] Invalid argument on the line output_file.write(line.replace('":"', ':')). I have tried to reproduce it but the same set of data can go through successfully and then crash during a different run. I was thinking that maybe it has something to do with memory limitations of the with open(output_file_path, 'r') but running it concurrently didn't reveal anything. File-path is always valid(no additional characters that could break it).

At this moment I would be thankful if you could take a look at this code and tell me what could be potentially wrong. I am aware that this is not the best approach databricks/spark-wise, so any suggestion on how to refactor it in order to be more robust is also welcomed.

I was thinking about something along the lines:

rdd = sc.textFile(file_path, use_unicode=False).map(lambda x: x.decode("UTF-16")).map(lambda x: x.encode("UTF-8")).map(lambda x: x.replace('":"', ':'))

But then I still need to find a way of checking the encoding of the files, haven't really found any spark-ish way of doing that so far.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
2.8k views
Welcome To Ask or Share your Answers For Others

1 Answer

等待大神答复

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...