Python连接HDFS实现文件上传下载及Pandas转换文本文件到CSV操作

微信扫一扫,分享到朋友圈

Python连接HDFS实现文件上传下载及Pandas转换文本文件到CSV操作

1. 目标

通过hadoop hive或spark等数据计算框架完成数据清洗后的数据在HDFS上

爬虫和机器学习在Python中容易实现

在Linux环境下编写Python没有pyCharm便利

需要建立Python与HDFS的读写通道

2. 实现

安装Python模块pyhdfs

版本:Python3.6, hadoop 2.9

读文件代码如下

from pyhdfs import HdfsClient
client=HdfsClient(hosts='ghym:50070')#hdfs地址
res=client.open('/sy.txt')#hdfs文件路径,根目录/
for r in res:
line=str(r,encoding='utf8')#open后是二进制,str()转换为字符串并转码
print(line)

写文件代码如下

from pyhdfs import HdfsClient
client=HdfsClient(hosts='ghym:50070',user_name='hadoop')#只有hadoop用户拥有写权限
str='hello world'
client.create('/py.txt',str)#创建新文件并写入字符串

上传本地文件到HDFS

from pyhdfs import HdfsClient
client = HdfsClient(hosts='ghym:50070', user_name='hadoop')
client.copy_from_local('d:/pydemo.txt', '/pydemo')#本地文件绝对路径,HDFS目录必须不存在

3. 读取文本文件写入csv

Python安装pandas模块

确认文本文件的分隔符

# pyhdfs读取文本文件,分隔符为逗号,
from pyhdfs import HdfsClient
client = HdfsClient(hosts='ghym:50070', user_name='hadoop')
inputfile=client.open('/int.txt')
# pandas调用读取方法read_table
import pandas as pd
df=pd.read_table(inputfile,encoding='gbk',sep=',')#参数为源文件,编码,分隔符
# 数据集to_csv方法转换为csv
df.to_csv('demo.csv',encoding='gbk',index=None)#参数为目标文件,编码,是否要索引

补充知识: 记 读取hdfs 转 pandas 再经由pandas转为csv的一个坑

工作流程是这样的:

读取 hdfs 的 csv 文件,采用的是 hdfs 客户端提供的 read 方法,该方法返回一个生成器。

将读取到的数据按 逗号 处理,变为一个二维数组。

将二维数组传给 pandas,生成 df。

经若干处理后,将 df 转为 csv 文件并写入hdfs。

问题是这样的:

正常的数据:

ZERO,MEAN,STD,CV,INC,OPP,CS,IS_OUTNET
0,9.233,2.445,0.265,1.202,241,1,0
0,8.667,1.882,0.217,1.049,179,1,0

三行数据,正常走流程,没有任何问题。

异常数据:

ZERO,MEAN,STD,CV,INC,OPP,CS,IS_OUTNET,probability,prediction
0,9.233,2.445,0.265,1.202,241,1,0,'[0.9653901649086855,0.03460983509131456]',0.0
0,8.667,1.882,0.217,1.049,179,1,0,'[0.9653901649086855,0.03460983509131456]',0.0

在每一行中都会有一个数组类似的数据,有一对引号包起来,中间存在逗号,不可以拆分。

为此,我的做法如下:

匹配逗号是被成对引号包围的字符串。

将匹配到的字符串中的逗号替换为特定字符。

将替换后的新字符串替换回原字符串。

在将原字符串中的特定字符串替换为逗号。

本来这样做没有什么问题,但是在经由pandas转为csv的时候,发现原来带引号的字符串变为了前后各带三个引号。

源数据:

处理后的数据:

方法如下:

仔细研究对比了下数据,发现数据里的引号其实只是在纯文本文件中用来标识其为字符串,并不应该存在于实际数据中。

而我每次匹配后都是原封不动替换回去,譬如:

源数据:

"[0.9653901649086855,0.03460983509131456]"

匹配替换后:

"[0.9653901649086855${dot}0.03460983509131456]"

这样传给pandas,它就会认为这个数据是带引号的,在重新转为csv的时候,就会进行转义等操作,导致多出很多引号。

所以解决办法就是在替换之前,将匹配时遇到的引号也去掉:

PATTERN = '(?<=(?P<quote>['"]))([^,]+,[^,]+)+?(?=(?P=quote))'

中间 ([^,]+,[^,]+)+? 要用+?,因为必须确定是有这样的组合才可以,并且非贪婪模式,故不可 ? 或者 *?

(ps:为了方便后面引用前面的匹配,我在环视匹配中创建了一个组)

再来个整体效果:

为了说明效果,引用pandas的自带读取csv方法:

可以看到pandas读取出的该位置数据也是字符串,引号正是作为一个字符串声明而存在。

再次修改正则:

def split_by_dot_escape_quote(string):
"""
按逗号分隔字符串,若其中有引号,将引号内容视为整体
"""
# 匹配引号中的内容,非贪婪,采用正向肯定环视,
# 当左引号(无论单双引)被匹配到,放入组quote,
# 中间的内容任意,但是要用+?,非贪婪,且至少有一次匹配到字符,
# 若*?,则匹配0次也可,并不会匹配任意字符(环视只匹配位置不匹配字符),
# 由于在任意字符后面又限定了前面匹配到的quote,故只会匹配到",
# +?则会限定前面必有字符被匹配,故"",或引号中任意值都可匹配到
pattern = re.compile('(?=(?P<quote>['"])).+?(?P=quote)')
rs = re.finditer(pattern, string)
for data in rs:
# 匹配到的字符串
old_str = data.group()
# 将匹配到的字符串中的逗号替换为特定字符,
# 以便还原到原字符串进行替换
new_str = old_str.replace(',', '${dot}')
# 由于匹配到的引号仅为字符串申明,并不具有实际意义,
# 需要把匹配时遇到的引号都去掉,只替换掉当前匹配组的引号
new_str = re.sub(data.group('quote'), '', new_str)
string = string.replace(old_str, new_str)
sps = string.split(',')
return map(lambda x: x.replace('${dot}', ','), sps)
s = '"2011,603","3510006998","F","5","5","0",""'
print(list(split_by_dot_escape_quote(s)))

运行结果如下:

之前想的正则有些复杂,反而偏离了本意,还是对正则的认识不够深。

以上这篇Python连接HDFS实现文件上传下载及Pandas转换文本文件到CSV操作就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

时间:2020-06-05

CSV means Comma Separated Values. It is plain text (ansi). The CSV (“Comma Separated Value”) file format is often used to exchange data between disparate applications. The file format, as it is used in Microsoft Excel, has become a pseudo standa

写入txt文件 def text_save(filename, data):#filename为写入CSV文件的路径,data为要写入数据列表. file = open(filename,’a’) for i in range(len(data)): s = str(data[i]).replace(‘[‘,”).replace(‘]’,”)#去除[],这两行按数据不同,可以选择 s = s.replace(“‘”,”).replace(‘,’,”) +’n’ #去除单引号,

本文实例为大家分享了三种方式使用python写数据到csv或xlsx文件,供大家参考,具体内容如下 第一种:使用csv模块,写入到csv格式文件 # -*- coding: utf-8 -*- import csv with open(“my.csv”, “a”, newline=”) as f: writer = csv.writer(f) writer.writerow([“URL”, “predict”, “

本文实例讲述了php写入数据到CSV文件的方法.分享给大家供大家参考.具体实现方法如下: <?php $row = 0; ini_set(‘max_execution_time’, 300); $cate;$item;$value;$us; $fp = fopen(“torah1.csv”, “w”); if (($handle = fopen(“t.csv”, “r”)) !== FALSE) { while (($d

本文实例讲述了Python写入数据到MP3文件中的方法.分享给大家供大家参考.具体分析如下: 通过Mp3的Id3V1数据段的数据来修正Mp3文件的正确名字,但是,有时候这个数据断中的数据是空的,所以这里写一个修改Id3V1数据段的数据的函数,同样是练习. 使用方法: writeMp3Header[ SongName] = ‘测试歌曲名称’ writeMp3Header[ SongPeople] = ‘不得闲’ writeMp3Header[ ZhuanJi] = ‘专辑’ writeMp3Hea

python使用pandas和xlsxwriter读写xlsx文件 已有xlsx文件如下: 1. 读取前n行所有数据 # coding: utf-8 import pandas as pd # 1. 读取前n行所有数据 df = pd.read_excel(‘school.xlsx’)#读取xlsx中第一个sheet data1 = df.head(7) # 读取前7行的所有数据,dataFrame结构 data2 = df.values #list形式,读取表格所有数据 print(“获取到所

程序中经常需要使用excel文件,批量读取文件中的数据 python读取excel文件可以使用xlrd模块 pip install xlrd安装模块 示例: #coding=utf8 import xlrd from os import path import sys filename=’ip.xlsx’ if not path.isfile(filename): print “err: not exists or not file ip.xlsx ” sys.exit() getfi

基于win10系统,python3.6 读取csv 使用csv函数包,安装 pip install csv 使用方法: import csv def fileload(filename = ‘待读取.csv’): csvfile = open(filename, encoding = ‘utf-8’) data = csv.reader(csvfile) dataset = [] for line in data: dataset.append(line) csvfile.close() ret

在python写脚本过程中需要将带有中文的字符串内容写入文件,出现了报错的现象. —————————- UnicodeEncodeError: ‘ascii’ codec can’t encode characters in position 0-1: ordinal not in range(128) —————————- 经过网上搜索出错原因得到结果: python中如果使用系统默认的open方法打开的文件只能写入asc

以下是演示**”如何在Python中复制文件”的九种方法**. shutil copyfile()方法 shutil copy()方法 shutil copyfileobj()方法 shutil copy2()方法 os popen方法 os系统()方法 Thread()方法 子进程调用()方法 子进程check_output()方法 1. Shutil Copyfile()方法 该方法只有在目标可写时才将源的内容复制到目的地.如果您没有写入权限,则会引发IOError. 它通过打开输

在导入文件的时候,Python只搜索当前脚本所在的目录,加载(entry-point)入口脚本运行目录和sys.path中包含的路径例如包的安装地址.所以如果要在当前脚本引用其他文件,除了将文件放在和脚本同一目录下,还有以下几种方法, 1. 将文件所在位置添加到sys.path中 import sys sys.path.insert(0, ‘/path/to/application/app/folder’) # or sys.path.append(‘/path/to/application/a

本文实例讲述了Python实现批量更换指定目录下文件扩展名的方法.分享给大家供大家参考,具体如下: #encoding=utf-8 #author: walker #date: 2013-12-06 #function: 深度遍历指定目录,更换指定扩展名 import os import os.path #读入指定目录并转换为绝对路径 rootdir = raw_input(‘root dir:n’) rootdir = os.path.abspath(rootdir) print(‘abso

Android SharedPreferences源码都不会,怎么通过面试?

上一篇

别让HR再质问我:我费劲招的人,你用缓存问废了,不能简单点?

下一篇

你也可能喜欢

Python连接HDFS实现文件上传下载及Pandas转换文本文件到CSV操作

长按储存图像,分享给朋友