Python API 操作Hadoop hdfs详解

微信扫一扫,分享到朋友圈

Python API 操作Hadoop hdfs详解

http://pyhdfs.readthedocs.io/en/latest/

1:安装

由于是windows环境(linux其实也一样),只要有pip或者setup_install安装起来都是很方便的

>pip install hdfs

2:Client——创建集群连接

> from hdfs import *   > client = Client("http://s100:50070")

其他参数说明:

classhdfs.client.Client(url, root=None, proxy=None, timeout=None, session=None)

url:ip:端口

root:制定的hdfs根目录

proxy:制定登陆的用户身份

timeout:设置的超时时间

session:连接标识

client = Client("http://127.0.0.1:50070",root="/",timeout=100,session=False)   >>> client.list("/")   [u'home',u'input', u'output', u'tmp']

3:dir——查看支持的方法

>dir(client)

4:status——获取路径的具体信息

其他参数:

status(hdfs_path, strict=True)

hdfs_path:就是hdfs路径

strict:设置为True时,如果hdfs_path路径不存在就会抛出异常,如果设置为False,如果路径为不存在,则返回None

5:list——获取指定路径的子目录信息

>client.list("/")  [u'home',u'input', u'output', u'tmp']

其他参数:

list(hdfs_path, status=False)

status:为True时,也返回子目录的状态信息,默认为Flase

6:makedirs——创建目录

>client.makedirs("/123")

其他参数:makedirs(hdfs_path, permission=None)

permission:设置权限

>client.makedirs("/test",permission=777)

7: rename—重命名

>client.rename("/123","/test")

8:delete—删除

>client.delete("/test")

其他参数:

delete(hdfs_path, recursive=False)

recursive:删除文件和其子目录,设置为False如果不存在,则会抛出异常,默认为False

9:upload——上传数据

>client.upload("/test","F:[PPT]Google Protocol Buffers.pdf");

其他参数:

upload(hdfs_path, local_path, overwrite=False, n_threads=1, temp_dir=None,
chunk_size=65536,progress=None, cleanup=True, **kwargs)

overwrite:是否是覆盖性上传文件

n_threads:启动的线程数目

temp_dir:当overwrite=true时,远程文件一旦存在,则会在上传完之后进行交换

chunk_size:文件上传的大小区间

progress:回调函数来跟踪进度,为每一chunk_size字节。它将传递两个参数,文件上传的路径和传输的字节数。一旦完成,-1将作为第二个参数

cleanup:如果在上传任何文件时发生错误,则删除该文件

10:download——下载

>client.download("/test/NOTICE.txt","/home")

11:read——读取文件

withclient.read("/test/[PPT]Google Protocol Buffers.pdf") as reader:  print reader.read()

其他参数:

read(*args, **kwds)

hdfs_path:hdfs路径

offset:设置开始的字节位置

length:读取的长度(字节为单位)

buffer_size:用于传输数据的字节的缓冲区的大小。默认值设置在HDFS配置。

encoding:制定编码

chunk_size:如果设置为正数,上下文管理器将返回一个发生器产生的每一chunk_size字节而不是一个类似文件的对象

delimiter:如果设置,上下文管理器将返回一个发生器产生每次遇到分隔符。此参数要求指定的编码。

progress:回调函数来跟踪进度,为每一chunk_size字节(不可用,如果块大小不是指定)。它将传递两个参数,文件上传的路径和传输的字节数。称为一次与- 1作为第二个参数。

问题:

1.

hdfs.util.HdfsError: Permission denied: user=dr.who, access=WRITE, inode="/test":root:supergroup:drwxr-xr-x

解决办法是:在配置文件hdfs-site.xml中加入

<property>
<name>dfs.permissions</name>
<value>false</value>
</property>

/usr/local/hadoop-2.6.4/bin/hadoopjar /usr/local/hadoop-2.6.4/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar-input <输入目录> # 可以指定多个输入路径,例如:-input ‘/user/foo/dir1’ -input ‘/user/foo/dir2’

-inputformat<输入格式 JavaClassName> -output <输出目录>-outputformat <输出格式 JavaClassName> -mapper <mapper executable orJavaClassName> -reducer <reducer executable or JavaClassName>-combiner <combiner executable or JavaClassName> -partitioner<JavaClassName> -cmdenv <name=value> # 可以传递环境变量,可以当作参数传入到任务中,可以配置多个

-file <依赖的文件> #配置文件,字典等依赖

-D<name=value> # 作业的属性配置

Map.py:

#!/usr/local/bin/python
import sys
for line in sys.stdin:
ss = line.strip().split(' ')
for s in ss:
if s.strip()!= "":
print "%st%s"% (s, 1)

Reduce.py:

#!/usr/local/bin/python
import sys
current_word = None
count_pool = []
sum = 0
for line in sys.stdin:
word, val = line.strip().split('t')
if current_word== None:
current_word = word
if current_word!= word:
for count in count_pool:
sum += count
print "%st%s"% (current_word, sum)
current_word = word
count_pool = []
sum = 0
count_pool.append(int(val))
for count in count_pool:
sum += count
print "%st%s"% (current_word, str(sum))
Run.sh:
HADOOP_CMD="/data/hadoop-2.7.0/bin/hadoop"
STREAM_JAR_PATH="/data/hadoop-2.7.0/share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar"
INPUT_FILE_PATH_1="/The_Man_of_Property.txt"
OUTPUT_PATH="/output"
$HADOOP_CMD fs -rmr-skipTrash $OUTPUT_PATH
# Step 1.
$HADOOP_CMD jar$STREAM_JAR_PATH 
-input $INPUT_FILE_PATH_1 
-output $OUTPUT_PATH 
-mapper"python map.py" 
-reducer "pythonred.py" 
-file ./map.py 
-file ./red.py

目的:通过python模拟mr,计算每年的最高气温。

1. 查看数据文件,需要截取年份和气温,生成key-value对。

[tianyc@TeletekHbase python]$ cat test.dat
0067011990999991950051507004...9999999N9+00001+99999999999...
0043011990999991950051512004...9999999N9+00221+99999999999...
0043011990999991950051518004...9999999N9-00111+99999999999...
0043012650999991949032412004...0500001N9+01111+99999999999...
0043012650999991949032418004...0500001N9+00781+99999999999...

2. 编写map,打印key-value对

[tianyc@TeletekHbase python]$ cat map.py
import re
import sys
for line in sys.stdin:
val=line.strip()
(year,temp)=(val[15:19],val[40:45])
print "%st%s" % (year,temp)
[tianyc@TeletekHbase python]$ cat test.dat|python map.py
1950 +0000
1950 +0022
1950 -0011
1949 +0111
1949 +0078

3. 将结果排序

[tianyc@TeletekHbase python]$ cat test.dat|python map.py |sort
1949 +0078
1949 +0111
1950 +0000
1950 -0011
1950 +0022

4. 编写redurce,对map中间结果进行处理,生成最终结果

[tianyc@TeletekHbase python]$ cat red.py
import sys
(last_key,max_val)=(None,0)
for line in sys.stdin:
(key,val)=line.strip().split('t')
if last_key and last_key!=key:
print '%st%s' % (last_key, max_val)
(last_key, max_val)=(key,int(val))
else:
(last_key, max_val)=(key,max(max_val,int(val)))
if last_key:
print '%st%s' % (last_key, max_val)

5. 执行。

[tianyc@TeletekHbase python]$ cat test.dat|python map.py |sort|python red.py
1949 111
1950 22

使用python语言进行MapReduce程序开发主要分为两个步骤,一是编写程序,二是用Hadoop Streaming命令提交任务。

还是以词频统计为例

一、程序开发

1、Mapper

for line in sys.stdin:
filelds = line.strip.split(' ')
for item in fileds:
print item+' '+'1'

2、Reducer

import sys
result={}
for line in sys.stdin:
kvs = line.strip().split(' ')
k = kvs[0]
v = kvs[1]
if k in result:
result[k]+=1
else:
result[k] = 1
for k,v in result.items():
print k+' '+v
....

写完发现其实只用map就可以处理了…reduce只用cat就好了

3、运行脚本

1)Streaming简介

Hadoop的MapReduce和HDFS均采用Java进行实现,默认提供Java编程接口,用户通过这些编程接口,可以定义map、reduce函数等等。

但是如果希望使用其他语言编写map、reduce函数怎么办呢?

Hadoop提供了一个框架Streaming,Streaming的原理是用Java实现一个包装用户程序的MapReduce程序,该程序负责调用hadoop提供的Java编程接口。

2)运行命令

/.../bin/hadoop streaming
-input /..../input
-output /..../output
-mapper "mapper.py"
-reducer "reducer.py"
-file mapper.py
-file reducer.py
-D mapred.job.name ="wordcount"
-D mapred.reduce.tasks = "1"

3)Streaming常用命令

(1)-input <path>:指定作业输入,path可以是文件或者目录,可以使用*通配符,-input选项可以使用多次指定多个文件或目录作为输入。

(2)-output <path>:指定作业输出目录,path必须不存在,而且执行作业的用户必须有创建该目录的权限,-output只能使用一次。

(3)-mapper:指定mapper可执行程序或Java类,必须指定且唯一。

(4)-reducer:指定reducer可执行程序或Java类,必须指定且唯一。

(5)-file, -cacheFile, -cacheArchive:分别用于向计算节点分发本地文件、HDFS文件和HDFS压缩文件,具体使用方法参考文件分发与打包。

(6)numReduceTasks:指定reducer的个数,如果设置-numReduceTasks 0或者-reducer NONE则没有reducer程序,mapper的输出直接作为整个作业的输出。

(7)-jobconf | -D NAME=VALUE:指定作业参数,NAME是参数名,VALUE是参数值,可以指定的参数参考hadoop-default.xml。

-jobconf mapred.job.name=’My Job Name’设置作业名

-jobconf mapred.job.priority=VERY_HIGH | HIGH | NORMAL | LOW | VERY_LOW设置作业优先级

-jobconf mapred.job.map.capacity=M设置同时最多运行M个map任务

-jobconf mapred.job.reduce.capacity=N设置同时最多运行N个reduce任务

-jobconf mapred.map.tasks 设置map任务个数

-jobconf mapred.reduce.tasks 设置reduce任务个数

-jobconf mapred.compress.map.output 设置map的输出是否压缩

-jobconf mapred.map.output.compression.codec 设置map的输出压缩方式

-jobconf mapred.output.compress 设置reduce的输出是否压缩

-jobconf mapred.output.compression.codec 设置reduce的输出压缩方式

-jobconf stream.map.output.field.separator 设置map输出分隔符

例子:

-D stream.map.output.field.separator=: 以冒号进行分隔

-D stream.num.map.output.key.fields=2 指定在第二个冒号处进行分隔,也就是第二个冒号之前的作为key,之后的作为value

(8)-combiner:指定combiner Java类,对应的Java类文件打包成jar文件后用-file分发。

(9)-partitioner:指定partitioner Java类,Streaming提供了一些实用的partitioner实现,参考KeyBasedFiledPartitoner和IntHashPartitioner。

(10)-inputformat, -outputformat:指定inputformat和outputformat Java类,用于读取输入数据和写入输出数据,分别要实现InputFormat和OutputFormat接口。如果不指定,默认使用TextInputFormat和TextOutputFormat。

(11)cmdenv NAME=VALUE:给mapper和reducer程序传递额外的环境变量,NAME是变量名,VALUE是变量值。

(12)-mapdebug, -reducedebug:分别指定mapper和reducer程序失败时运行的debug程序。

(13)-verbose:指定输出详细信息,例如分发哪些文件,实际作业配置参数值等,可以用于调试。

以上这篇Python API 操作Hadoop hdfs详解就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

时间:2020-06-05

最近,我加入了Cloudera,在这之前,我在计算生物学/基因组学上已经工作了差不多10年.我的分析工作主要是利用Python语言和它很棒的科学计算栈来进行的.但Apache Hadoop的生态系统大部分都是用Java来实现的,也是为Java准备的,这让我很恼火.所以,我的头等大事变成了寻找一些Python可以用的Hadoop框架. 在这篇文章里,我会把我个人对这些框架的一些无关科学的看法写下来,这些框架包括: Hadoop流 mrjob dumbo hadoopy pydoop 其它 最终,在

当我们使用python的hdfs包进行上传和下载文件的时候,总会出现如下问题 requests.packages.urllib3.exceptions.NewConnectionError:<requests.packages.urllib3.connection.HTTPConnection object at 0x7fe87cc37c50>: Failed to establish a new connection: [Errno -2] Name or service not known

如果在平时学习,工作中经常使用 FTP 服务器 ,可以设置成开机自启,在设置之前要先了解几个关于自启的命令: 1.chkconfig 命令 主要作用:用于检查,设置系统的各种服务.其中有几个重要参数,先了解 –list ,chkconfig –list :列出 chkconfig 知道的所有命令,chkconfig 服务名 on /off :开启,关闭服务(一般是开机自启或是关闭) 2.检查是否设置 vsftpd 开机自启, chkconfig –list | grep vsftpd :”

在Python中可以使用paramiko模块中的sftp登陆远程主机,实现上传和下载功能. 1.功能实现 根据输入参数判断是文件还是目录,进行上传和下载 本地参数local需要与远程参数remote类型一致,文件以文件名结尾,目录以结尾 上传和下载的本地和远程目录需要存在 异常捕获 2.代码实现 #!/usr/bin/python # coding=utf-8 import paramiko import os def sftp_upload(host,port,username,passwo

微信提供了文件上传的方法wx.uploadFile来上传我们的图片 wx.chooseImage({ success: function(res) { var tempFilePaths = res.tempFilePaths wx.uploadFile({ url: ‘http://example.weixin.qq.com/upload’, //仅为示例,非真实的接口地址 filePath: tempFilePaths[0], name: ‘file’, formData:{ ‘user’:

本文实例讲述了Python实现FTP上传文件或文件夹实例.分享给大家供大家参考.具体如下: import sys import os import json from ftplib import FTP _XFER_FILE = ‘FILE’ _XFER_DIR = ‘DIR’ class Xfer(object): ””’ @note: upload local file or dirs recursively to ftp server ”’ def __init__(self): s

exam = { ‘math’: ’95’, ‘eng’: ’96’, ‘chn’: ’90’, ‘phy’: ”, ‘chem’: ” } 使用下列遍历的方法删除: 1. for e in exam: 2. if exam[e] == ”: 3. del exam[e] 结果出现下列错误,怎么解决: Traceback (most recent call last): File “Untitled.py”, line 3, in <module> for e in

springmvc后端: @RequestMapping(value=”scoreFileUpload”,produces = “text/html; charset=utf-8”) @ResponseBody public String upload(HttpSession session,@RequestParam(“file1”) MultipartFile file,@RequestParam(“paperId”) S

机器环境: Python 3.6.4 numpy==1.14.0 pandas==0.22.0 解决方法: np.set_printoptions(suppress=True) 默认情况下,ndarray数组采用科学计数法显示: 加入代码后: 以上这篇完美解决python中ndarray 默认用科学计数法显示的问题就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们.

公司开发环境跑在linux上,用了一周都没问题,突然今天无法使用了,具体表现就是一打开pycharm,同步远程解释器就卡在上传helper文件之处,折腾一上午加一中午,用这个方法解决了,写下来避免其他人入坑! 解决:pycharm安装文件夹下 例如C:Program FilesJetBrainsPyCharm 2017.2.3这里面找到并且 删掉skeletons文件夹,重新启动再配置远程环境就好了 以上这篇解决pycharm 远程调试 上传 helpers 卡住的问题就是小编分享给大家的

主要原因是用户目录编码使用了中文,解决方法如下: pip安装python包会加载我的用户目录,我的用户目录恰好是中文的,ascii不能编码. 解决办法是: python目录 Python27Libsite-packages 建一个文件sitecustomize.py 内容写: import sys sys.setdefaultencoding(‘gb2312’) python会自动运行这个文件. 以上这篇完美解决Python 2.7不能正常使用pip install的问题就是小编分享给大家的

1.ajax传过去的参数在controller接受不到 解决:在contoller中增加@RequestParam 例如:saveUploadFile( @RequestParam(“file”) MultipartFile file,HttpServletRequest request) 2.org.springframework.web.multipart.support.MissingServletRequestPartException: Required request pa

Android SharedPreferences源码都不会,怎么通过面试?

上一篇

别让HR再质问我:我费劲招的人,你用缓存问废了,不能简单点?

下一篇

你也可能喜欢

Python API 操作Hadoop hdfs详解

长按储存图像,分享给朋友