@@ -180,7 +180,11 @@ def process_dataframe(
180180 if 'ARRAY' in str (dtype ) and col_name in df .columns :
181181 logger .info (f'Converting column { col_name } to PostgreSQL array format' )
182182 df [col_name ] = df [col_name ].apply (
183- lambda x : '{' + ',' .join (str (i ) for i in x ) + '}' if isinstance (x , list ) else x
183+ lambda x : (
184+ '{' + ',' .join (str (i ) for i in x ) + '}'
185+ if (hasattr (x , '__iter__' ) and not isinstance (x , str ))
186+ else x
187+ )
184188 )
185189
186190 with engine .begin () as conn :
@@ -404,7 +408,7 @@ async def process_files(*, engine, session, files: list[File], chunk_size: int =
404408 if file .category == 'credits' :
405409 logger .info (f'📚 Loading credit file: { file .url } ' )
406410 data = (
407- pd .read_parquet (file .url , engine = 'fastparquet ' )
411+ pd .read_parquet (file .url , engine = 'pyarrow ' )
408412 .reset_index (drop = True )
409413 .reset_index ()
410414 .rename (columns = {'index' : 'id' })
@@ -428,7 +432,7 @@ async def process_files(*, engine, session, files: list[File], chunk_size: int =
428432
429433 elif file .category == 'projects' :
430434 logger .info (f'📚 Loading project file: { file .url } ' )
431- data = pd .read_parquet (file .url , engine = 'fastparquet ' )
435+ data = pd .read_parquet (file .url , engine = 'pyarrow ' )
432436 df = project_schema .validate (data )
433437 project_dtype_dict = {
434438 'project_id' : String ,
@@ -482,7 +486,7 @@ async def process_files(*, engine, session, files: list[File], chunk_size: int =
482486 metrics ['file_metrics' ][file .id ]['category' ] = file .category
483487 try :
484488 logger .info (f'📚 Loading clip file: { file .url } ' )
485- data = pd .read_parquet (file .url , engine = 'fastparquet ' )
489+ data = pd .read_parquet (file .url , engine = 'pyarrow ' )
486490 clips_dfs .append (data )
487491 update_file_status (file , session , 'success' )
488492 metrics ['file_metrics' ][file .id ]['status' ] = 'success'
0 commit comments