摘要
CarbonData有自己的解析器,除了Spark的SQL解析器之外,还可以解析和处理与CarbonData表处理相关的某些命令。
数据类型
参考地址: http://carbondata.apache.org/supported-data-types-in-carbondata.html
数字类型
- SMALLINT
- INT/INTEGER
- BIGINT
- DOUBLE
- DECIMAL
- FLOAT
- BYTE
注意 Float 和 Bytes 仅被支持在 SDK 和 FileFormat.
日期/时间类型
- TIMESTAMP
- DATE
字符串类型
- STRING
- CHAR
- VARCHAR
注意 对于 string 最长 32000 字符, use LONG_STRING_COLUMNS in table property. Please refer to TBLProperties in CreateTable for more information.
复杂类型
- arrays: ARRAY
- structs: STRUCT<col_name : data_type COMMENT col_comment, …>
- maps: MAP<primitive_type, data_type>
注意 Only 2 level complex type schema is supported for now.
其他类型
- BOOLEAN
DDL:Create,Drop,Partition,Bucketing,Alter,CTAS,External Table
参考地址: http://carbondata.apache.org/ddl-of-carbondata.html
创建表
此命令可用于通过指定字段列表和表属性来创建CarbonData表。还可以指定表需要存储的位置。
CREATE TABLE [IF NOT EXISTS] [db_name.]table_name[(col_name data_type , ...)]
STORED AS carbondata
[TBLPROPERTIES (property_name=property_value, ...)]
[LOCATION 'path']
CarbonData也支持”STORED AS carbondata”和”USING carbondata”. 演示代码位置https://github.com/apache/carbondata/blob/master/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
对查询结果创建表
这个功能允许用户创建一张carbondata表从 Parquet/Hive/Carbon 表中的任何一个.
CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
STORED AS carbondata
[TBLPROPERTIES (key1=val1, key2=val2, ...)]
AS select_statement;
创建外部表
此函数允许用户通过指定位置来创建外部表。
CREATE EXTERNAL TABLE [IF NOT EXISTS] [db_name.]table_name
STORED AS carbondata LOCATION '$FilesPath'
创建数据库
可以指定位置
CREATE DATABASE [IF NOT EXISTS] database_name [LOCATION path];
表管理
显示表
这个命令可以展示当前数据库或指定数据库的的所有表
SHOW TABLES [IN db_Name]
修改表
重命名表
ALTER TABLE [db_name.]table_name RENAME TO new_table_name
增加列
ALTER TABLE [db_name.]table_name ADD COLUMNS (col_name data_type,...)
TBLPROPERTIES('DICTIONARY_INCLUDE'='col_name,...',
'DEFAULT.VALUE.COLUMN_NAME'='default_value')
删除列
ALTER TABLE [db_name.]table_name DROP COLUMNS (col_name, ...)
改变数据类型
ALTER TABLE [db_name.]table_name CHANGE col_name col_name changed_column_type
合并索引
此命令用于将段中的所有CarbonData索引文件(.carbonndex)合并为单个CarbonData索引合并文件(.carbonndexmerge)。这提高了第一次查询的性能。
ALTER TABLE [db_name.]table_name COMPACT 'SEGMENT_INDEX'
设置/取消设置本地目录属性
ALTER TABLE tablename SET TBLPROPERTIES('LOCAL_DICTIONARY_ENABLE'='false','LOCAL_DICTIONARY_THRESHOLD'='1000','LOCAL_DICTIONARY_INCLUDE'='column1','LOCAL_DICTIONARY_EXCLUDE'='column2')
删除表
DROP TABLE [IF EXISTS] [db_name.]table_name
刷新表
此命令用于从现有的Carbon表数据向Hive元存储目录注册Carbon表。
REFRESH TABLE $db_NAME.$table_NAME
表和列的注释
创建表时
CREATE TABLE [IF NOT EXISTS] [db_name.]table_name[(col_name data_type [COMMENT col_comment], ...)]
[COMMENT table_comment]
STORED AS carbondata
[TBLPROPERTIES (property_name=property_value, ...)]
示例
CREATE TABLE IF NOT EXISTS productSchema.productSalesTable (
productNumber Int COMMENT 'unique serial number for product')
COMMENT "This is table comment"
STORED AS carbondata
TBLPROPERTIES ('DICTIONARY_INCLUDE'='productNumber')
修改表时
ALTER TABLE carbon SET TBLPROPERTIES ('comment'='this table comment is modified'); # 添加注释
ALTER TABLE carbon UNSET TBLPROPERTIES ('comment'); # 删除注释
分区
标准分区
创建分区表
CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
[(col_name data_type , ...)]
[COMMENT table_comment]
[PARTITIONED BY (col_name data_type , ...)]
[STORED BY file_format]
[TBLPROPERTIES (property_name=property_value, ...)]
显示分区
SHOW PARTITIONS [db_name.]table_name
删除分区
ALTER TABLE table_name DROP [IF EXISTS] PARTITION (part_spec, ...)
插入覆盖
此命令允许您在特定分区上插入或加载覆盖。
INSERT OVERWRITE TABLE table_name
PARTITION (column = 'partition_name')
select_statement
CARBONDATA分区(HASH,RANGE,LIST)
alpha功能,此分区功能不支持更新和删除数据。
此分区支持3种类型(Hash,Range,List),类似于其他系统的分区特性,carbondata的分区特性可以通过对分区列进行过滤来提高查询性能。
创建HASH分区表
CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
[(col_name data_type , ...)]
PARTITIONED BY (partition_col_name data_type)
STORED AS carbondata
[TBLPROPERTIES ('PARTITION_TYPE'='HASH',
'NUM_PARTITIONS'='N' ...)]
创建Range分区表
CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
[(col_name data_type , ...)]
PARTITIONED BY (partition_col_name data_type)
STORED AS carbondata
[TBLPROPERTIES ('PARTITION_TYPE'='RANGE',
'RANGE_INFO'='2014-01-01, 2015-01-01, 2016-01-01, ...')]
创建List分区表
CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
[(col_name data_type , ...)]
PARTITIONED BY (partition_col_name data_type)
STORED AS carbondata
[TBLPROPERTIES ('PARTITION_TYPE'='LIST',
'LIST_INFO'='A, B, C, ...')]
显示分区
SHOW PARTITIONS [db_name.]table_name
增加新分区
SHOW PARTITIONS [db_name.]table_name
分割分区
ALTER TABLE [db_name].table_name SPLIT PARTITION(partition_id) INTO('new_partition1', 'new_partition2'...)
删除分区
删除分区保留数据
ALTER TABLE [db_name].table_name DROP PARTITION(partition_id)
删除分区和数据
ALTER TABLE [db_name].table_name DROP PARTITION(partition_id) WITH DATA
桶(BUCKETING)
Bucketing特性可用于将表/分区数据分发/组织到多个文件中,以便在同一个文件中存在类似的记录。创建表时,用户需要指定要用于bucketing的列和bucket的数量。对于bucket的选择,使用列的Hash值。
CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
[(col_name data_type, ...)]
STORED AS carbondata
TBLPROPERTIES('BUCKETNUMBER'='noOfBuckets',
'BUCKETCOLUMNS'='columnname')
DML: Load, Insert, Update, Delete
参考地址: http://carbondata.apache.org/dml-of-carbondata.html
加载数据
加载文件到carbondata表中
LOAD DATA [LOCAL] INPATH 'folder_path'
INTO TABLE [db_name.]table_name
OPTIONS(property_name=property_value, ...)
插入数据
未完待续
scala编写carbondata(spark)程序
创建上下文
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.CarbonSession._
val carbon = SparkSession.builder().config(sc.getConf).getOrCreateCarbonSession("hdfs://127.0.0.1:9000/user/carbon/carbonstore")
carbon.sql("show tables")
创建数据库
carbon.sql("CREATE DATABASE dtwave_dev")
创表
carbon.sql("CREATE TABLE dtwave_dev.carbon_tablename_new (name String, PhoneNumber String) STORED BY 'carbondata'")
carbon.sql("insert into table dtwave_dev.carbon_tablename_new select * from dtwave_dev.spark_test")
carbon.sql("select * from dtwave_dev.carbon_tablename_new").show
入库
SQL
CREATE TABLE tablename (name String, PhoneNumber String) STORED BY "carbondata"
TBLPROPERTIES (...)
LOAD DATA [LOCAL] INPATH 'folder path' [OVERWRITE] INTO TABLE tablename OPTIONS(...)
INSERT INTO TABLE tablennme select_statement1 FROM table1;
DataFrame
df.write.format(“carbondata").options("tableName", "t1")) .mode(SaveMode.Overwrite).save()
查询
SELECT projectlist FROM t1 WHERE condlist GROUP BY columns ORDER BY columns
更新
更新一列
UPDATE table1 A
SET A.REVENUE = A.REVENUE - 10 WHERE A.PRODUCT = 'phone'
Modify two columns in table1
更新两列
UPDATE table1 A
SET (A.PRODUCT, A.REVENUE) =
(
SELECT PRODUCT, REVENUE
FROM table2 B
WHERE B.CITY = A.CITY AND B.BROKER = A.BROKER
)
WHERE A.DATE BETWEEN '2017-01-01' AND '2017-01-31'
删除
DELETE FROM table1 A WHERE A.CUSTOMERID = ‘123’
carbon.sql("select * from dtwave_dev.carbon_tablename_new").show
carbon.sql("delete from dtwave_dev.carbon_tablename_new a WHERE a.name='1'")
carbon.sql("select * from dtwave_dev.carbon_tablename_new").show
carbon.sql("update dtwave_dev.carbon_tablename_new A SET (A.name) = A.name WHERE A.PhoneNumber = '4'")
carbon.sql("UPDATE dtwave_dev.carbon_tablename_new a SET (a.name, a.PhoneNumber) = ( SELECT '5' as name ,'6' from dtwave_dev.carbon_tablename_new b)")
carbon.sql("UPDATE dtwave_dev.carbon_tablename_new a SET (a.name, a.PhoneNumber) = ( SELECT '5' as name ,'6' as PhoneNumber)")
HDFS对应的文件
HDFS 目录
/user/carbon/carbonstore/dtwave_dev/carbon_tablename_new
drwxr-xr-x hulb supergroup 0 B 0 0 B Fact
drwxr-xr-x hulb supergroup 0 B 0 0 B Metadata
数据文件
/user/carbon/carbonstore/dtwave_dev/carbon_tablename_new/Fact/Part0
Permission Owner Group Size Replication Block Size Name
drwxr-xr-x hulb supergroup 0 B 0 0 B Segment_0
drwxr-xr-x hulb supergroup 0 B 0 0 B Segment_1
元数据文件
/user/carbon/carbonstore/dtwave_dev/carbon_tablename_new/Metadata
Permission Owner Group Size Replication Block Size Name
-rw-r--r-- hulb supergroup 16 B 2 128 MB 62acf472-3574-434e-a53f-f45901dff949.dict
-rw-r--r-- hulb supergroup 11 B 2 128 MB 62acf472-3574-434e-a53f-f45901dff949.dictmeta
-rw-r--r-- hulb supergroup 11 B 2 128 MB 62acf472-3574-434e-a53f-f45901dff949_16.sortindex
-rw-r--r-- hulb supergroup 16 B 2 128 MB c5c7949a-a437-41d1-8f47-a7a81e68c4ba.dict
-rw-r--r-- hulb supergroup 11 B 2 128 MB c5c7949a-a437-41d1-8f47-a7a81e68c4ba.dictmeta
-rw-r--r-- hulb supergroup 11 B 2 128 MB c5c7949a-a437-41d1-8f47-a7a81e68c4ba_16.sortindex
-rw-r--r-- hulb supergroup 387 B 2 128 MB schema
-rw-r--r-- hulb supergroup 7.37 KB 2 128 MB tablestatus
-rw-r--r-- hulb supergroup 243 B 2 128 MB tableupdatestatus-1505890299461
java编写carbondata(spark)程序
maven
pom.xml配置
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-spark2</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-hive</artifactId>
<version>1.4.1</version>
</dependency>
</dependencies>
CarbonDataSql.java
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.CarbonSession;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession.*;
import org.apache.spark.sql.SparkSession.Builder;
public class CarbonDataDemo {
public static void main(String[] args) {
SparkConf conf = (new SparkConf()).setAppName("JAVA CarbonData Demo").setMaster("spark://192.168.1.1:7077");
SparkContext sc = new SparkContext(conf);
Builder builder = SparkSession.builder().config(sc.getConf());
SparkSession carbon = CarbonSession.CarbonBuilder(builder).getOrCreateCarbonSession("hdfs://192.168.20.30:9000/carbondata/store");
carbon.sql("LOAD DATA INPATH 'hdfs://192.168.1.1:9000/carbondata/sample.csv' INTO TABLE test_table");
carbon.sql("select * from default.test_table").show();
}
}
shell 编写carbondata(spark)程序
#!/bin/bash
exec spark2-shell --name spark-sql-test <<!EOF
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.CarbonSession._
val carbon = SparkSession.builder().config(sc.getConf).getOrCreateCarbonSession()
carbon.sql("show tables")
!EOF
参考文档网址
https://www.iteblog.com/archives/2078.html