Tuesday, October 22, 2019

Delete Old log files from HDFS - Older than 10 days

Script: ( Tested ) :


today=`date +'%s'`
hdfs dfs -ls /file/Path/ | grep "^d" | while read line ; do
dir_date=$(echo ${line} | awk '{print $6}')
difference=$(( ( ${today} - $(date -d ${dir_date} +%s) ) / ( 24*60*60 ) ))
filePath=$(echo ${line} | awk '{print $8}')

if [ ${difference} -gt 10 ]; then
    hdfs dfs -rm -r $filePath
fi
done



Note: Change file path and It is not skipping the trash.

Tuesday, October 15, 2019

NameNode HA fails over due to connection interruption with JournalNodes

Issue:

Occasionally NameNode HA fails over due to network connection interruption with JournalNodes.

The error message in NameNode log:

2015-11-06 17:39:09,497 FATAL namenode.FSEditLog (JournalSet.java:mapJournalsAndReportErrors(398)) - Error: starting log segment 2710407 failed for required journal (JournalAndStream(mgr=QJM to [172.xxx.xxx.xxx:8485, 172.xxx.xxx.xxx:8485, 172.xxx.xxx.xxx:8485], stream=null))
java.io.IOException: Timed out waiting 20000ms for a quorum of nodes to respond.
at org.apache.hadoop.hdfs.qjournal.client.AsyncLoggerSet.waitForWriteQuorum(AsyncLoggerSet.java:137)
at org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager.startLogSegment(QuorumJournalManager.java:403)
at org.apache.hadoop.hdfs.server.namenode.JournalSet$JournalAndStream.startLogSegment(JournalSet.java:107)
at org.apache.hadoop.hdfs.server.namenode.JournalSet$3.apply(JournalSet.java:222)

at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)
2015-11-06 17:39:09,500 INFO util.ExitUtil (ExitUtil.java:terminate(124)) - Exiting with status 1
2015-11-06 17:39:09,506 INFO namenode.NameNode (StringUtils.java:run(659)) - SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at <FQDN>/172.xxx.xxx.xxx
************************************************************

Cause:

Potential network interruption between active NameNode and all JournalNodes.


Solution:

Increase the journal quorum connection timeout value to 60 seconds by adding
following property to HDFS config in the hdfs-site:

Property Name:  dfs.qjournal.write-txns.timeout.ms
Property Value: 60000

While running a Tez job, it fails with 'vertex failure' error

The error below is seen while a Hive query runs in TEZ execution mode:

Logs:


Vertex failed, vertexName=Reducer 34, vertexId=vertex_1424999265634_0222_1_23, diagnostics=[Task failed, taskId=task_1424999265634_0222_1_23_000007, diagnostics=[AttemptID:attempt_1424999265634_0222_1_23_000007_0 Info:Error: java.lang.RuntimeException: java.lang.RuntimeException: Reduce operator initialization failed
at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:188)
at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:307)
at org.apache.hadoop.mapred.YarnTezDagChild$5.run(YarnTezDagChild.java:564)
at java.se
... 6 more
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: : init not supported
at org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.init(GenericUDAFStreamingEvaluator.java:70)

at org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor.init(ReduceRecordProcessor.java:160)
... 7 more

CAUSE:

Tez containers are not allocating enough memory to run the query.

SOLUTION:

Set the following configurations values in the Hive query to increase the memory:

tez.am.resource.memory.mb=4096
tez.am.java.opts=-server -Xmx3276m -Djava.net.preferIPv4Stack=true -XX:+UseNUMA -XX:+UseParallelGC

hive.tez.container.size=4096
hive.tez.java.opts=-server -Xmx3276m -Djava.net.preferIPv4Stack=true -XX:+UseNUMA -XX:+UseParallelGC

Note:
- Ensure that the *.opts values are 80% of the *.mbs and there the *mb values can be allocated by the NodeManagers.
- If the issue happens again, please increase the above values by adding the value of the min container size and rerun the query.

Monday, October 14, 2019

How to limit the number of mappers for a MapReduce job via Java code?

The number of mappers launched is based on the input block size.

The input block size is the size of the data block sent to different mappers while it is being read from the HDFS.

So in order to control the number of mappers we have to control the block size.  

Now, there are two properties we can look into:
- mapred.min.split.size
- mapred.max.split.size (size in bytes)  

For example, if we have a 20 GB file, and we want to launch 40 mappers, then we need to set it to 20480 / 40 = 512 MB each.

So for that the code would be:  

conf.set("mapred.min.split.size", "536870912");
conf.set("mapred.max.split.size", "536870912");  

Setting "mapred.map.tasks" is just a directive on how many mapper we want to launch. It doesn't guarantee that many mappers. 



More information on this topic:  http://wiki.apache.org/hadoop/HowManyMapsAndReduces

HDFS: I have under/over replicated blocks. How can I find more information?

In the case where HDFS reports there are under or over replicated blocks you can use the metasave option

$ sudo -u hdfs hdfs dfsadmin -metasave metasave-report.txt 

$ cd /var/log/hadoop-hdfs

$ cat metasave-report.txt

Metasave: Blocks waiting for replication: 125

/mapred/tmp/hdfs/.staging/job_1395354337974_1132/job.jar: blk_1073994598_253980 (replicas: l: 6 d: 0 c: 0 e: 0)  11.10.1.9:50010 :  11.12.14.30:50010 :  1.10.14.8:50010 :  1.12.10.2:50010 :  1.20.104.94:50010 :  1.10.14.2:50010 : 

Each block will have a line such as above which contains information on its replication (see highlighted):
l: live replicas
d: decomissioned replicas
c: corrupt replicas
e: excess replicas

This information can be used to determine the next action (such as remove the block if it is corrupt).

Above we can see the block has 6 live replicas and the IP of each datanode that stores one.

Fun with Linux

Multi Processes and threads in Shell Scripting -- Linux

Thursday, October 10, 2019

Some Spark Issues



1. Too many open files

This issue(Too many open files) is fixed by setting ulimit to 10000  or there can be corrupted disks on unix servers

:Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files in system
:Caused by: java.io.IOException: Too many open files in system
:Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files in system

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

2. Failed to send rpc



These errors basically mean the connection between Spark driver and executors are broken, mainly because executor is killed.
This could happen because of a number of reasons:

i- cluster is too busy and has hit maximum usage.
We realized this happens a lot more often when our cluster is too busy and has hit maximum usage.
What it means is that executors are accepted to DataNodes, but they fail to acquire enough memory on the datanode and therefore get killed.


ii- Metaspace attempts to grow
Metaspace attempts to grow beyond the executor(JVM) memory limits, resulting in loss of executors.
The best way to stop this error from appearing is to set below properties when launching Spark-Shell or submitting application using spark-submit:

spark.driver.extraJavaOptions = -XX:ReservedCodeCacheSize=100M-XX:MaxMetaspaceSize=256m
-XX:CompressedClassSpaceSize=256m
spark.executor.extraJavaOptions = -XX:ReservedCodeCacheSize=100M
-XX:MaxMetaspaceSize=256m
-XX:CompressedClassSpaceSize=256m
Please note that depending on your project and code, you may need to increase the values mentioned above.


iii- Network is slow
Network is slow for whatever reason. In our case, this was caused by a change in DNS which resulted in turning off caching.
This case could be fixed by adjusting spark.executor.heartbeatInterval and spark.network.timeout.
Default values for these 2 parameters are 10s and 120s.
You can adjust these 2 values based on how your network, the only point to consider here is that the later property, spark.network.timeout, should be greater than the first one.

From <https://thebipalace.com/2017/08/23/spark-error-failed-to-send-rpc-to-datanode/>

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
3.Container killed by YARN for exceeding memory limit

Error :
Container killed by YARN for exceeding memory limits. 5.0 GB of 5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead

https://hvivani.com.ar/2016/03/20/consider-boosting-spark-yarn-executor-memoryoverhead/

Each executor is allocated 6g of memory 3 cores. so, for each core, 2g of memory would be available. that's how you should view this

By default ‘spark.yarn.executor.memoryOverhead’ parameter is set to 384 MB. This value could be low depending on your application and the data load.
Suggested value for this parameter is ‘executorMemory * 0.10’.
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------


4. Failed to bind SparkUI

 ERROR SparkUI: Failed to bind SparkUI

java.net.BindException: Address already in use: Service 'SparkUI' failed after 16 retries (starting from 4303)! Consider explicitly setting the appropriate port for the service 'SparkUI' (for example spark.ui.port for SparkUI) to an available port or increasing spark.port.maxRetries.


You can resolve it either following any one of following solutions:
Solution-1: Set Spark UI Port explicitly as shown below
spark-shell --conf “spark.ui.port=10101”
//Here we are explicitly setting spark ui port to some random port which is not in use in above example I chosen 10101but you can choose any proper port number


Solution-2: Increase spark.port.maxRetries as shown below
spark-shell --conf “spark.port.maxRetries=100”
//Here we are setting spark.port.maxRetries to 100 so it will try maximum of 100 instead of 16 to find some unused port


We give each jobs port nos at 20 nos gap and it is success


----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

5. For increasing performance of spark jobs and to avoid over memory utilization at cluster level, admin increased this

<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>2048</value>
</property>

====================================================================*******************************====================================================================================
6. This is where yarn executor logs are aggregated into spark HDFS folder(before that yarn executor logs will be stored in local(/hadoop/yarn/log) of each executor nodes and when the spark app is completed, they will be removed from local into below
Spark HDFS folder)


   <property>
      <name>yarn.nodemanager.remote-app-log-dir</name>
      <value>/app-logs</value>
    </property>


Note:
Before that Log Aggregation is YES in a property in yarn-site.xml file

====================================================================*******************************====================================================================================
===================================================================================================================================================================================================================
Cassandra low performance impacts spark job to run slow(write time to cassandra is so delay)


Steps to Check Cassandra write time
     1) Go to yarn console and click on a running application link.
     2) Click on an “attempt” link.
     3) Click on the “logs” link for one of the executors.
     4) Click on “stderr” link
     5) Note the start offset in the url which is for display purposes.
     6) Change it to a larger negative number so you can see more of the log
     (e.g. -4096  ->  -20000).  Then search on the page for the word “wrote” to find various db write times entries.
    
    
    
===================================================================================================================================================================================================================

===================================================================================================================================================================================================================

Step by step process of Changing Ambari Metrics Collector service from Embedded to distributed mode - HDP

Steps:

# STOP AMS service

# Create directories in HDFS

$ hdfs dfs -mkdir -p /apps/ams/metrics
$ hdfs dfs -chown -R ams:hadoop /apps/ams/
$ hdfs dfs -chown -R ams:hadoop /apps/ams/metrics/


 # Copy the metric data from the AMS local directory( This is the value of hbase.rootdir in Advanced ams-hbase-site used when running in embedded mode) to an HDFS directory.

 $ hdfs dfs -copyFromLocal /opt/log/var/lib/ambari-metrics-collector/hbase/*  /apps/ams/metrics


 # Go to  Ambari> Services > Ambari Metrics > Configs.

 # Please change the Metrics Service operation mode from embedded to distributed.

 $ timeline.metrics.service.operation.mode -> distributed

 # In Advanced ams-hbase-site, 

 $ hbase.cluster.distributed -> true

 Please change root directory to HDFS from Local

 $ hbase.rootdir -> hdfs://<$NAMENODE_FQDN>:8020/apps/ams/metrics


Restart the Ambari Metrics collector > A new HMaster, HRegion server and Application History Server daemons will starts in metrics collector host.

 $ ps -ef | grep ams

# Configure the memory as per below document,

https://docs.cloudera.com/HDPDocuments/Ambari-2.7.3.0/using-ambari-core-services/content/amb_customize_ams_environment_specific_settings_for_a_cluster.html

Process to create keytabs which are specific to hosts.

This summary is not available. Please click here to view the post.

Enabling high availability for Ambari metrics collector


For Documentaion - https://docs.cloudera.com/HDPDocuments/Ambari-2.7.3.0/managing-high-availability/content/amb_enabling_ams_high_availability.html

Possible Issues


    - AMS Metrics Collector  keeps on crashing:



ERROR Logs from Ambari Metrics Collector log:


  ERROR [main] ApplicationHistoryServer:190 - AHSWebApp failed to start.  org.apache.hadoop.yarn.webapp.WebAppException: Error starting http server

  Caused by: java.net.BindException: Port in use: XXXXXXXXXXXX:6188
        at org.apache.hadoop.http.HttpServer2.constructBindException(HttpServer2.java:1000)

  Caused by: java.net.BindException: Cannot assign requested address

        at sun.nio.ch.Net.bind0(Native Method)

   INFO [main] AbstractService:272 - Service org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer failed in state STARTED; cause: org.apache.hadoop.yarn.exceptions.YarnRuntimeException: AHSWebApp failed to start.


Solution

Check this property - And it should be configured like below:

     timeline.metrics.service.webapp.address >> 0.0.0.0:6188

Hive Architecture

Hive Architecture in One Image