Ask for details and I'll be happy to help and expand. Note that this information is only available for the duration of the application by default. APIs. A SparkTask instance can be executed by Hive's task execution framework in the same way as for other tasks. Further optimization can be done down the road in an incremental manner as we gain more and more knowledge and experience with Spark. Differences between Apache Hive and Apache Spark. While sortByKey provides no grouping, it’s easy to group the keys as rows with the same key will come consecutively. If two ExecMapper instances exist in a single JVM, then one mapper that finishes earlier will prematurely terminate the other also. Such problems, such as static variables, have surfaced in the initial prototyping. c. CM -> Hive -> configuration -> set hive.execution.engine to spark, this is a permanent setup and it will control all the session including Oozie . Jetty libraries posted such a challenge during the prototyping. We propose rotating those variables in pre-commit test run so that enough coverage is in place while testing time isn’t prolonged. Standardizing on one execution backend is convenient for operational management, and makes it easier to develop expertise to debug issues and make enhancements. Therefore, we are going to take a phased approach and expect that the work on optimization and improvement will be on-going in a relatively long period of time while all basic functionality will be there in the first phase. It's possible to have the. makes the new concept easier to be understood. Hive on Spark provides better performance than Hive on MapReduce while offering the same features. On the contrary, we will implement it using MapReduce primitives. Hive will display a task execution plan that’s similar to that being displayed in “, Currently for a given user query Hive semantic analyzer generates an operator plan that's composed of a graph of logical operators such as, ) from the logical, operator plan. The Shark project translates query plans generated by Hive into its own representation and executes them over Spark. This process makes it more efficient and adaptable than a standard JDBC connection from Spark to Hive. will be used to connect mapper-side’s operations to reducer-side’s operations. For example,  Hive's operators, however, need to be initialized before being called to process rows and be closed when done processing. Also because some code in ExecReducer are to be reused, likely we will extract the common code into a separate class, ReducerDriver, so as to be shared by both MapReduce and Spark. It will also limit the scope of the project and reduce long-term maintenance by keeping Hive-on-Spark congruent to Hive MapReduce and Tez. Hive On Spark (EMR) May 24, 2020 EMR, Hive, Spark Saurav Jain. per application because of some thread-safety issues. Spark has accumulators which are variables that are only “added” to through an associative operation and can therefore be efficiently supported in parallel. If feasible, we will extract the common logic and package it into a shareable form, leaving the specific     implementations to each task compiler, without destabilizing either MapReduce or Tez.  Â. In the example below, the query was submitted with yarn application id – In Spark, we can choose, only if necessary key order is important (such as for SQL, provides no grouping, it’s easy to group the keys as rows with the same key will come consecutively. class implements MapReduce Mapper interface, but the implementation in Hive contains some code that can be reused for Spark. However, for first phase of the implementation, we will focus less on this unless it's easy and obvious. Run the 'set' command in Oozie itself 'along with your query' as follows . Allow Yarn to cache necessary spark dependency jars on nodes so that it does not need to be distributed each time when an application runs. However, this work should not have any impact on other execution engines. Spark can be run on Kubernetes, and Spark Thrift Server compatible with Hive Server2 is a great candidate. Please refer to, https://issues.apache.org/jira/browse/SPARK-2044. method. Neither semantic analyzer nor any logical optimizations will change. When a SparkTask is executed by Hive, such context object is created in the current user session. However, extra attention needs to be paid on the shuffle behavior (key generation, partitioning, sorting, etc), since Hive extensively uses MapReduce’s shuffling in implementing reduce-side, . To view the web UI after the fact, set spark.eventLog.enabled to true before starting the application. In fact, many primitive transformations and actions are SQL-oriented such as join and count. For the purpose of using Spark as an alternate execution backend for Hive, we will be using the. Example spark job. Currently not available in Spark Java API, We expect they will be made available soon with the help from Spark community. This class provides similar functions as. Further optimization can be done down the road in an incremental manner as we gain more and more knowledge and experience with Spark. Step 1 –  Spark primitives are applied to RDDs. Again this can be investigated and implemented as a future work. It should be “spark”. They can be used to implement counters (as in MapReduce) or sums. How to traverse and translate the plan is left to the implementation, but this is very Spark specific, thus having no exposure to or impact on other components. However, this work should not have any impact on other execution engines. Compared with Shark and Spark SQL, our approach by design supports all existing Hive features, including Hive QL (and any future extension), and Hive’s integration with authorization, monitoring, auditing, and other operational tools. Hive’s current way of trying to fetch additional information about failed jobs may not be available immediately, but this is another area that needs more research. Therefore, for each. does pure shuffling (no grouping or sorting), does shuffling plus sorting. Version matrix. We expect there will be a fair amount of work to make these operator tree thread-safe and contention-free. And Hive will now have unit tests running against MapReduce, Tez, and Spark. Note that Spark's built-in map and reduce transformation operators are functional with respect to each record. Spark SQL is a feature in Spark. Spark application developers can easily express their data processing logic in SQL, as well as the other Spark operators, in their code. . There is an existing. Finally, it seems that Spark community is in the process of improving/changing the shuffle related APIs. Block level bitmap indexes and virtual columns (used to build indexes). Many of these organizations, however, are also eager to migrate to Spark. Fortunately, Spark provides a few transformations that are suitable to substitute MapReduce’s shuffle capability, such as. Future features (such as new data types, UDFs, logical optimization, etc) added to Hive should be automatically available to those users without any customization work to be done done in Hive’s Spark execution engine. The spark jar will only have to be present to run Spark jobs, they are not needed for either MapReduce or Tez execution. For more information about Spark monitoring, visit, http://spark.apache.org/docs/latest/monitoring.html, Explain statements will be similar to that of, In fact, Tez has already deviated from MapReduce practice with respect to union. , which describes the task plan that the Spark job is going to execute upon. So we will discuss Apache Hive vs Spark SQL on the basis of their feature. In the example below, the query was submitted with yarn application id –. The only new thing here is that these MapReduce primitives will be executed in Spark. It's possible we need to extend Spark's Hadoop RDD and implement a Hive-specific RDD. While this comes for “free” for MapReduce and Tez, we will need to provide an equivalent for Spark. Hive has reduce-side join as well as map-side join (including map-side hash lookup and map-side sorted merge). A Spark job can be monitored via SparkListener APIs. The “. 是把hive查询从mapreduce 的mr (Hadoop计算引擎)操作替换为spark rdd(spark 执行引擎) 操作. MapReduceCompiler compiles a graph of MapReduceTasks and other helper tasks (such as MoveTask) from the logical, operator plan. However, since Hive has a large number of dependencies, these dependencies are not included in the default Spark distribution. Thus, we will have, , depicting a job that will be executed in a Spark cluster, and. For the purpose of using Spark as an alternate execution backend for Hive, we will be using the mapPartitions transformation operator on RDDs, which provides an iterator on a whole partition of data. A Spark job can be monitored via. Required fields are marked *, You may use these HTML tags and attributes:
 , org.apache.spark.serializer.KryoSerializer, 2. If an application has logged events over the course of its lifetime, then the Standalone master’s web UI will automatically re-render the application’s UI after the application has finished. Lately I have been working on updating the default execution engine of hive configured on our EMR cluster.  Default execution engine on hive is “tez”, and I wanted to update it to “spark” which means running hive queries should be submitted spark application  also called as hive on spark. The main work to implement the Spark execution engine for Hive lies in two folds: query planning, where Hive operator plan from semantic analyzer is further translated a task plan that Spark can execute, and query execution, where the generated Spark plan gets actually executed in the Spark cluster. However, this can be further investigated and evaluated down the road. By being applied by a series of transformations such as groupBy and filter, or actions such as count and save that are provided by Spark, RDDs can be processed and analyzed to fulfill what MapReduce jobs can do without having intermediate stages. And the success of Hive does not completely depend on the success of either Tez or Spark. It’s expected that Spark is, or will be, able to provide flexible control over the shuffling, as pointed out in the previous section(, As specified above, Spark transformations such as. Job execution is triggered by applying a foreach() transformation on the RDDs with a dummy function. Execution engine property is controlled by “hive.execution.engine” in hive-site.xml. On the other hand, Spark is a framework that’s very different from either MapReduce or Tez. Name Email Dev Id Roles Organization; Matei Zaharia: matei.zahariagmail.com: matei: Apache Software Foundation As long as I know, Tez which is a hive execution engine can be run just on YARN, not Kubernetes. Your email address will not be published. When a, is executed by Hive, such context object is created in the current user session. Thus, naturally Hive tables will be treated as RDDs in the Spark execution engine. Failed to create Spark client for Spark session d944d094-547b-44a5-a1bf-77b9a3952fe2 Failed to create Spark client for Spark session d944d094-547b-44a5-a1bf-77b9a3952fe2 Add the following new properties in hive-site.xml. Tez behaves similarly, yet generates a TezTask that combines otherwise multiple MapReduce tasks into a single Tez task. Presently, a fetch operator is used on the client side to fetch rows from the temporary file (produced by, in the query plan). By being applied by a series of transformations such as. If feasible, we will extract the common logic and package it into a shareable form, leaving the specific. instance, some further translation is necessary, as. The Shark project translates query plans generated by Hive into its own representation and executes them over Spark. Spark’s Standalone Mode cluster manager also has its own web UI. However, Tez has chosen to create a separate class, RecordProcessor, to do something similar.). However, Hive is planned as an interface or convenience for querying data stored in HDFS. Currently Spark client library comes in a single jar. Thus, this part of design is subject to change. Job execution is triggered by applying a. ) Update the value of the property of. , we will need to inject one of the transformations. Run any query and check if it is being submitted as a spark application. Hive is known to make use of HQL (Hive Query Language) whereas Spark SQL is known to make use of Structured Query language for processing and querying of data Hive provides schema flexibility, portioning and bucketing the tables whereas Spark SQL performs SQL querying it is only possible to read data from existing Hive installation. Once all the above changes are completed successfully, you can validate it using the following steps. From an infrastructure point of view, we can get sponsorship for more hardware to do continuous integration. Hive will display a task execution plan that’s similar to that being displayed in “explain”     command for MapReduce and Tez. Please refer to https://issues.apache.org/jira/browse/SPARK-2044 for the details on Spark shuffle-related improvement. Open the hive shell and verify the value of hive.execution.engine. ERROR : FAILED: Execution Error, return code 30041 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. Spark SQL supports a different use case than Hive. The main design principle is to have no or limited impact on Hive’s existing code path and thus no functional or performance impact. Secondly, providing such an alternative further increases Hive's adoption as it exposes Spark users to a viable, feature-rich de facto standard SQL tools on Hadoop. On Mon, Mar 2, 2015 at 5:15 PM, scwf wrote: yes, have placed spark-assembly jar in hive lib folder. Basic “job succeeded/failed” as well as progress will be as discussed in “Job monitoring”. object that’s instantiated with user’s configuration. There will be a new “ql” dependency on Spark. We will find out if RDD extension is needed and if so we will need help from Spark community on the Java APIs. In this video spark-hive is describe how to connect with hive metastore and performe operation through hive commands. set hive.execution.engine=spark; Hive on Spark was added in HIVE-7292. Other versions of Spark may work with a given version of Hive, but … Most testing will be performed in this mode. We propose modifying Hive to add Spark as a third execution backend(HIVE-7292), parallel to MapReduce and Tez. Thus, this part of design is subject to change. It’s rather complicated in implementing, in MapReduce world, as manifested in Hive. transformation operator on RDDs, which provides an iterator on a whole partition of data. Functional gaps may be identified and problems may arise. may perform physical optimizations that's suitable for Spark. The number of partitions can be optionally given for those transformations, which basically dictates the number of reducers. How to generate SparkWork from Hive’s operator plan is left to the implementation. The “explain” command will show a pattern that Hive users are familiar with. Tez behaves similarly, yet generates a. that combines otherwise multiple MapReduce tasks into a single Tez task. Here are the main motivations for enabling Hive to run on Spark: Spark user benefits: This feature is very valuable to users who are already using Spark for other data processing and machine learning needs. (Tez probably had the same situation. It uses Hive’s parser as the frontend to provide Hive QL support. MapFunction and ReduceFunction will have to perform all those in a single call() method. It’s expected that Hive community will work closely with Spark community to ensure the success of the integration. A Hive table is nothing but a bunch of files and folders on HDFS. While we could see the benefits of running local jobs on Spark, such as avoiding sinking data to a file and then reading it from the file to memory, in the short term, those tasks will still be executed the same way as it is today. Potentially more, but the following is a summary of improvement that’s needed from Spark community for the project: It can be seen from above analysis that the project of Spark on Hive is simple and clean in terms of functionality and design, while complicated and involved in implementation, which may take significant time and resources. Hive on Spark. class that handles printing of status as well as reporting the final result. hive 2.3.4 on spark 2.4.0 Hive on Spark provides Hive with the ability to utilize Apache Spark as its execution engine. instance can be executed by Hive's task execution framework in the same way as for other tasks. Hive is nothing but a way through which we implement mapreduce like a sql or atleast near to it. Spark, on the other hand, is the best option for running big data analytics. If Spark is run on Mesos or YARN, it is still possible to reconstruct the UI of a finished application through Spark’s history server, provided that the application’s event logs exist. Spark natively supports accumulators of numeric value types and standard mutable collections, and programmers can add support for new types. While Apache Hive and Spark SQL perform the same action, retrieving data, each does the task in a different way. Internally, the, method will make RDDs and functions out of a. instance, and submit the execution to the Spark cluster via a Spark client. Note that this is just a matter of refactoring rather than redesigning. It uses Hive’s parser as the frontend to provide Hive QL support. However, extra attention needs to be paid on the shuffle behavior (key generation, partitioning, sorting, etc), since Hive extensively uses MapReduce’s shuffling in implementing reduce-side join. Powered by a free Atlassian Confluence Open Source Project License granted to Apache Software Foundation. Hive is the best option for performing data analytics on large volumes of data using SQLs. On the other hand, to run Hive code on Spark, certain Hive libraries and their dependencies need to be distributed to Spark cluster by calling SparkContext.addJar() method. Such problems, such as static variables, have surfaced in the initial prototyping. ”. That is, Spark will be run as hive execution engine. This configures Spark to log Spark events that encode the information displayed in the UI to persisted storage. Fortunately, Spark provides a few transformations that are suitable to substitute MapReduce’s shuffle capability, such as partitionBy, groupByKey, and sortByKey. Hadoop 2.9.2 Tez 0.9.2 Hive 2.3.4 Spark 2.4.2 Hadoop is installed in cluster mode. For instance, Hive's, doesn't require the key to be sorted, but MapReduce does it nevertheless. However, Hive’s map-side operator tree or reduce-side operator tree operates in a single thread in an exclusive JVM. Performance: Hive queries, especially those involving multiple reducer stages, will run faster, thus improving user experience as Tez does. This configures Spark to log Spark events that encode the information displayed in the UI to persisted storage. During the task plan generation, SparkCompiler may perform physical optimizations that's suitable for Spark. Hive on Spark Project (HIVE-7292) While Spark SQL is becoming the standard for SQL on Spark, we do realize many organizations have existing investments in Hive. The spark jar will be handled the same way Hadoop jars are handled: they will be used during compile, but not included in the final distribution. ExecMapper class implements MapReduce Mapper interface, but the implementation in Hive contains some code that can be reused for Spark. Spark provides WebUI for each SparkContext while it’s running. While it's mentioned above that we will use MapReduce primitives to implement SQL semantics in the Spark execution engine, union is one exception. instances exist in a single JVM, then one mapper that finishes earlier will prematurely terminate the other also. We will further determine if this is a good way to run Hive’s Spark-related tests. The HWC library loads data from LLAP daemons to Spark executors in parallel. Spark launches mappers and reducers differently from MapReduce in that a worker may process multiple HDFS splits in a single JVM. In addition, plugging in Spark at the execution layer keeps code sharing at maximum and contains the maintenance cost, so Hive community does not need to make specialized investments for Spark.   Â. Hive will give appropriate feedback to the user about progress and completion status of the query when running queries on Spark. Defining SparkWork in terms of MapWork and ReduceWork makes the new concept easier to be understood. However, there seems to be a lot of common logics between Tez and Spark as well as between MapReduce and Spark. In your case, if you want to try temporarly for a specific query. There are two related projects in the Spark ecosystem that provide Hive QL support on Spark: Shark and Spark SQL. More information about Spark can be found here: Apache Spark page: http://spark.apache.org/, Apache Spark blogpost: http://blog.cloudera.com/blog/2013/11/putting-spark-to-use-fast-in-memory-computing-for-your-big-data-applications/, Apache Spark JavaDoc:  http://spark.apache.org/docs/1.0.0/api/java/index.html. Cloudera's Impala, on the other hand, is SQL engine on top Hadoop. Physical optimizations and MapReduce plan generation have already been moved out to separate classes as part of Hive on Tez work. Step 3 – It needs a execution engine. Spark jobs can be run local by giving “. Transformation partitionBy does pure shuffling (no grouping or sorting), groupByKey does shuffling and grouping, and sortByKey() does shuffling plus sorting. , to be shared by both MapReduce and Spark. Currently Hive has a coverage problem as there are a few variables that requires full regression suite run, such as Tez vs MapReduce, vectorization on vs off, etc. Hive Partition is a way to organize large tables into smaller logical tables based on values of columns; one logical table (partition) for each distinct value. With the context object, RDDs corresponding to Hive tables are created and, (more details below) that are built from Hive’s, and applied to the RDDs. transformation on the RDDs with a dummy function. Specifically, user-defined functions (UDFs) are fully supported, and most performance-related configurations work with the same semantics. Hive, as known was designed to run on MapReduce in Hadoopv1 and later it works on YARN and now there is spark on which we can run Hive queries. Spark … There is an alternative to run Hive on Kubernetes. However, Tez has chosen to create a separate class, , but the function's implementation will be different, made of the operator chain starting from. Interacting with Different Versions of Hive Metastore Spark SQL also supports reading and writing data stored in Apache Hive. However, it’s very likely that the metrics are different from either MapReduce or Tez, not to mention the way to extract the metrics. , so as to be shared by both MapReduce and Spark. This is what worked for us. It is healthy for the Hive project for multiple backends to coexist. We expect that Spark community will be able to address this issue timely. RDDs can be created from Hadoop InputFormats (such as HDFS files) or by transforming other RDDs. Spark jobs can be run local by giving “local” as the master URL. Therefore, for each ReduceSinkOperator in SparkWork, we will need to inject one of the transformations. Now when we have our metastore running, let’s define some trivial spark job example so we can use to test our Hive Metastore. Hive and Spark are both immensely popular tools in the big data world. {"serverDuration": 115, "requestCorrelationId": "e7fa1f41ad881a4b"}. Secondly, we expect the integration between Hive and Spark will not be always smooth. In Hive, tables are created as a directory on HDFS. To use Spark as an execution engine in Hive, set the following: The default value for this configuration is still “mr”. For more information about Spark monitoring, visit http://spark.apache.org/docs/latest/monitoring.html. 取到hive的元数据信息之后就可以拿到hive的所有表的数据. That is, users choosing to run Hive on either MapReduce or Tez will have existing functionality and code paths as they do today. Your email address will not be published. See: Hive on Spark: Join Design Master for detailed design. Note that this is just a matter of refactoring rather than redesigning. implementations to each task compiler, without destabilizing either MapReduce or Tez. Again this can be investigated and implemented as a future work.  Â. One SparkContext per user session is right thing to do, but it seems that Spark assumes one SparkContext per application because of some thread-safety issues. Upload all the jars available in $SPARK_HOME/jars to hdfs folder(for example:hdfs:///xxxx:8020/spark-jars). A handful of Hive optimizations are not included in Spark. We expect there will be a fair amount of work to make these operator tree thread-safe and contention-free. The same applies for presenting the query result to the user. per user session is right thing to do, but it seems that Spark assumes one. Some of these (such as indexes) are less important due to Spark SQL’s in-memory computational model. Thus, it’s very likely to find gaps and hiccups during the integration. Default execution engine on hive is “tez”, and I wanted to update it to “spark” which means running hive queries should be submitted spark application also called as hive on spark. The Hadoop Ecosystem is a framework and suite of tools that tackle the many challenges in dealing with big data.