Revolution R Enterprise是款基于R语言的大数据分析工具,它包含的rxImport()函数可以读取带分隔符的文本数据、固定格式的文本文件、SAS数据、SPSS数据、数据库里的数据等。rxImport()函数也可以处理半结构化数据,比如服务器日志文件。下面是一个日志文件的前三行:

190.12.51.140 - - [24/Feb/2013:01:44:32 -0600] "GET /bin/macosx/leopard/contrib/2.12/FGN_1.5.tgz HTTP/1.0" 200 510166 "-" "R (2.12.2 x86_64-apple-darwin9.8.0 x86_64 darwin9.8.0)"
190.12.51.140 - - [24/Feb/2013:01:44:39 -0600] "GET /bin/macosx/leopard/contrib/2.12/fgui_1.0-2.tgz HTTP/1.0" 200 404275 "-" "R (2.12.2 x86_64-apple-darwin9.8.0 x86_64 darwin9.8.0)"
190.12.51.140 - - [24/Feb/2013:01:44:45 -0600] "GET /bin/macosx/leopard/contrib/2.12/fields_6.6.tgz HTTP/1.0" 200 2852202 "-" "R (2.12.2 x86_64-apple-darwin9.8.0 x86_64 darwin9.8.0)"

可以看出它不全是用空格分隔的,不过空格也有一定的用处。下面用rxImport()来读取日志文件的前五行,每次一行:

# Point to file
dataDir <- "C:/DATA/REVO CRAN LOG"
file <- file.path(dataDir,"sLog.txt")
#-------------------------------------------
# Read 5 rows to see how rxImport handles things
rxImport(inData=file,outFile="test",
         rowsPerRead=1,
         numRows=5,
         overwrite=TRUE)

rxGetInfo(data="test",getVarInfo=TRUE,numRows=2)

生成一个二进制的XDF文件,是下面的格式:

Number of observations: 5 
Number of variables: 10 
Number of blocks: 5 
Compression type: zlib 
Variable information: 
Var 1: V1, Type: character
Var 2: V2, Type: integer, Low/High: (NA, NA)
Var 3: V3, Type: integer, Low/High: (NA, NA)
Var 4: V4, Type: character
Var 5: V5, Type: character
Var 6: V6, Type: character
Var 7: V7, Type: integer, Low/High: (200, 404)
Var 8: V8, Type: integer, Low/High: (1051, 2852202)
Var 9: V9, Type: character
Var 10: V10, Type: character
Data (2 rows starting with row 1):
             V1 V2 V3                    V4     V5
1 190.12.51.140 NA NA [24/Feb/2013:01:44:32 -0600]
2 190.12.51.140 NA NA [24/Feb/2013:01:44:39 -0600]
                                                            V6  V7     V8 V9
1    GET /bin/macosx/leopard/contrib/2.12/FGN_1.5.tgz HTTP/1.0 200 510166  -
2 GET /bin/macosx/leopard/contrib/2.12/fgui_1.0-2.tgz HTTP/1.0 200 404275  -
                                                     V10
1 R (2.12.2 x86_64-apple-darwin9.8.0 x86_64 darwin9.8.0)
2 R (2.12.2 x86_64-apple-darwin9.8.0 x86_64 darwin9.8.0)

不算很完美,但已经很有用了!此外,rxImport() 在导入的过程中还把列名赋值为”V1”,”V2”等。下面的代码,在导入文件的同时还做了些数据清理的工作,删除了一些列,并给另一些列重新命名。

# Import data
colX <- list("V1" = list(type="character",newName = "IP"),
             "V7" = list(type="character", newName = "Status"),
             "V8" = list(type="integer", newName = "NoClue"),
             "V10" = list(type="character", newName = "R_version")
            )

rxImport(
         inData=file,outFile="logData",
         colInfo=colX,
         varsToDrop=c("V2","V3","V9"),
         transformVars = c("V4","V5"),
         transforms=list(Date = substr(V4,2,12),
         UTC = substr(V4,14,21),
         Offset = as.numeric(substr(V5,1,5))),
         overwrite=TRUE
        )

rxGetInfo(data="logData",getVarInfo=TRUE,numRows=2)

注意,transforms参数在读入每块数据的时候做了些基础的文本处理。上一步输出的是下面的形式:

 IP                    V4     V5
1 190.12.51.140 [24/Feb/2013:01:44:32 -0600]
2 190.12.51.140 [24/Feb/2013:01:44:39 -0600]
                                                            V6 Status NoClue
1    GET /bin/macosx/leopard/contrib/2.12/FGN_1.5.tgz HTTP/1.0    200 510166
2 GET /bin/macosx/leopard/contrib/2.12/fgui_1.0-2.tgz HTTP/1.0    200 404275
                                               R_version        Date      UTC
1 R (2.12.2 x86_64-apple-darwin9.8.0 x86_64 darwin9.8.0) 24/Feb/2013 01:44:32
2 R (2.12.2 x86_64-apple-darwin9.8.0 x86_64 darwin9.8.0) 24/Feb/2013 01:44:39
  Offset
1   -600
2   -600

现在我们更进一步。用rxDataStep()函数删除V4和V5这两个没用的字段,并进一步处理整个数据集。下面的代码在数据处理过程中用了一个转换函数,把V6这列拆分成了若干个有意义的字段:

rxDataStep(inData="logData",outFile="logData_2",
            varsToDrop=c("V4","V5"),
            transformVars = c("V6"),
            transformFunc = function(data) { 
                temp <- unlist(strsplit(data$V6, ' '));
                temp.1 <- seq(from = 1, to = length(temp), by = 3);
                temp.2 <- seq(from = 2, to = length(temp), by = 3);
                temp.3 <- seq(from = 3, to = length(temp), by = 3);
                data$Command <- temp[temp.1];
                data$File <- temp[temp.2];
                data$Protocol <- temp[temp.3];
            data },
            overwrite=TRUE)

             IP Status NoClue
1 190.12.51.140    200 510166
2 190.12.51.140    200 404275
                                               R_version        Date      UTC
1 R (2.12.2 x86_64-apple-darwin9.8.0 x86_64 darwin9.8.0) 24/Feb/2013 01:44:32
2 R (2.12.2 x86_64-apple-darwin9.8.0 x86_64 darwin9.8.0) 24/Feb/2013 01:44:39
  Offset Command                                            File Protocol
1   -600     GET    /bin/macosx/leopard/contrib/2.12/FGN_1.5.tgz HTTP/1.0
2   -600     GET /bin/macosx/leopard/contrib/2.12/fgui_1.0-2.tgz HTTP/1.0

代码的末尾部分出现的转换函数transformFunc()看起来有些不可思议。理解它运作方式的关键是,要意识到rxDataStep() 在读取大文件的时候是每次读取文件的一个chunk(块)。每个chunk都把数据保存在list中,处理过程也必须考虑到这种存储结构。如果list结构不是很清晰,最好还是把数据打印出来查看一下。下面的代码读取文件的5行,并把4行保存到一个chunk里,并打印出chunk里的内容。

# Look at what is going on in the chunks
rxImport(inData=file,outFile="test",
        transformFunc = function(data) { 
        print(data);
# Internal variables can tell you aboutthe chunk
        print(paste("chunk starts with row",.rxStartRow,"of file"));
        print(paste("chunk number = ",.rxChunkNum));
        print(paste("number of rows read = ",.rxNumRows));
        data }, 
        rowsPerRead=4, # reads 4 rows into a chunk if available
        numRows=5, # only read 5 rows from the file
        overwrite=TRUE) # overwrite the file if it exists

上面的代码也指出了一些内置变量,在写处理chunk的转换函数时很有用:

  • .rxStartRow 包含文件中chunk开始的那行
  • .rxChunkNum 包含chunk的数目
  • .rxNumRows 包含每个chunk里的行数

下载输出文件 查看上面最后一段代码的输出。