"STATUS_DEMO_04" #26 prio=5 os_prio=31 tid=0x00007fdc89093000 nid=0x6603 waiting for monitor entry [0x00007000083a7000] //抢占锁失败进入阻塞状态 java.lang.Thread.State: BLOCKED (on object monitor) at com.yiyuery.sample.states.ThreadStateDemo$BlockDemo.run(ThreadStateDemo.java:60) - waiting to lock <0x0000000715c640b0> (a java.lang.Class for com.yiyuery.sample.states.ThreadStateDemo$BlockDemo) at java.lang.Thread.run(Thread.java:748)
"STATUS_DEMO_03" #24 prio=5 os_prio=31 tid=0x00007fdc68973800 nid=0x9503 waiting on condition [0x00007000082a4000] //带有超时时间阻塞 java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at java.lang.Thread.sleep(Thread.java:340) at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386) at com.yiyuery.sample.states.ThreadStateDemo$BlockDemo.run(ThreadStateDemo.java:60) - locked <0x0000000715c640b0> (a java.lang.Class for com.yiyuery.sample.states.ThreadStateDemo$BlockDemo) at java.lang.Thread.run(Thread.java:748)
"STATUS_DEMO_02" #22 prio=5 os_prio=31 tid=0x00007fdc89879000 nid=0x6403 in Object.wait() [0x00007000081a1000] //wait 阻塞等待状态 java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0x000000071576a1e0> (a java.lang.Class for com.yiyuery.sample.states.ThreadStateDemo) at java.lang.Object.wait(Object.java:502) at com.yiyuery.sample.states.ThreadStateDemo.lambda$main$1(ThreadStateDemo.java:39) - locked <0x000000071576a1e0> (a java.lang.Class for com.yiyuery.sample.states.ThreadStateDemo) at com.yiyuery.sample.states.ThreadStateDemo$$Lambda$2/1164175787.run(Unknown Source) at java.lang.Thread.run(Thread.java:748)
"STATUS_DEMO_O1" #21 prio=5 os_prio=31 tid=0x00007fdc89092000 nid=0x6203 waiting on condition [0x000070000809e000] //带有超时时间阻塞 java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at java.lang.Thread.sleep(Thread.java:340) at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386) at com.yiyuery.sample.states.ThreadStateDemo.lambda$main$0(ThreadStateDemo.java:27) at com.yiyuery.sample.states.ThreadStateDemo$$Lambda$1/1012570586.run(Unknown Source) at java.lang.Thread.run(Thread.java:748)
源码分析
线程的启动
线程调度
.
源码
JDK
publicsynchronizedvoidstart(){ /** * This method is not invoked for the main method thread or "system" * group threads created/set up by the VM. Any new functionality added * to this method in the future may have to also be added to the VM. * * A zero status value corresponds to state "NEW". */ if (threadStatus != 0) thrownew IllegalThreadStateException();
/* Notify the group that this thread is about to be started * so that it can be added to the group's list of threads * and the group's unstarted count can be decremented. */ group.add(this);
boolean started = false; try { start0(); started = true; } finally { try { if (!started) { group.threadStartFailed(this); } } catch (Throwable ignore) { /* do nothing. If start0 threw a Throwable then it will be passed up the call stack */ } } } //调用JVM的本地方法 privatenativevoidstart0();
JVM
//hotspot-87ee5ee27509/src/os/linux/vm/os_linux.cpp MemNotifyThread::MemNotifyThread(int fd): Thread() { assert(memnotify_thread() == NULL, "we can only allocate one MemNotifyThread"); _fd = fd;
// stack size if (os::Linux::supports_variable_stack_size()) { // calculate stack size if it's not specified by caller if (stack_size == 0) { stack_size = os::Linux::default_stack_size(thr_type);
switch (thr_type) { case os::java_thread: // Java threads use ThreadStackSize which default value can be // changed with the flag -Xss assert (JavaThread::stack_size_at_create() > 0, "this should be set"); stack_size = JavaThread::stack_size_at_create(); break; case os::compiler_thread: if (CompilerThreadStackSize > 0) { stack_size = (size_t)(CompilerThreadStackSize * K); break; } // else fall through: // use VMThreadStackSize if CompilerThreadStackSize is not defined case os::vm_thread: case os::pgc_thread: case os::cgc_thread: case os::watcher_thread: if (VMThreadStackSize > 0) stack_size = (size_t)(VMThreadStackSize * K); break; } }
stack_size = MAX2(stack_size, os::Linux::min_stack_allowed); pthread_attr_setstacksize(&attr, stack_size); } else { // let pthread_create() pick the default value. }
{ // Serialize thread creation if we are running with fixed stack LinuxThreads bool lock = os::Linux::is_LinuxThreads() && !os::Linux::is_floating_stack(); if (lock) { os::Linux::createThread_lock()->lock_without_safepoint_check(); }
pthread_t tid; int ret = pthread_create(&tid, &attr, (void* (*)(void*)) java_start, thread);
pthread_attr_destroy(&attr);
if (ret != 0) { if (PrintMiscellaneous && (Verbose || WizardMode)) { perror("pthread_create()"); } // Need to clean up stuff we've allocated so far thread->set_osthread(NULL); delete osthread; if (lock) os::Linux::createThread_lock()->unlock(); returnfalse; }
// Store pthread info into the OSThread osthread->set_pthread_id(tid);
// Wait until child thread is either initialized or aborted { Monitor* sync_with_child = osthread->startThread_lock(); MutexLockerEx ml(sync_with_child, Mutex::_no_safepoint_check_flag); while ((state = osthread->get_state()) == ALLOCATED) { sync_with_child->wait(Mutex::_no_safepoint_check_flag); } }
if (lock) { os::Linux::createThread_lock()->unlock(); } }
// Aborted due to thread limit being reached if (state == ZOMBIE) { thread->set_osthread(NULL); delete osthread; returnfalse; }
// The thread is returned suspended (in state INITIALIZED), // and is started higher up in the call chain assert(state == INITIALIZED, "race condition"); returntrue; }
线程的中断
中断方式
设置一个共享变量,来通过修改变量来实现while循环的结束
通过Interrupt机制唤醒阻塞状态下的线程
示例代码
publicclassTestextendsThread{
privatestaticint count = 0;
@Override publicvoidrun(){ while (!Thread.currentThread().isInterrupted()) { System.out.println(">>>"+count++); } System.out.println("运行结束:interrupt Flag"+Thread.currentThread().isInterrupted()); }
publicstaticvoidmain(String[] args)throws InterruptedException { Thread thread = new Thread(new Test2()); thread.start(); TimeUnit.SECONDS.sleep(3); thread.interrupt(); } } /* Connected to the target VM, address: '127.0.0.1:53223', transport: 'socket' java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at java.lang.Thread.sleep(Thread.java:340) at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386) at com.yiyuery.sample.interrupt.Test2.run(Test2.java:28) at java.lang.Thread.run(Thread.java:748) Sleep 状态下被中断:interrupt Flag false */
此时线程不会停止,睡眠状态下的线程被中断后,需要考虑是否处理中断的通知信息。
@Override publicvoidrun(){ while (!Thread.currentThread().isInterrupted()) { try { TimeUnit.SECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); System.out.println("Sleep 状态下被中断:interrupt Flag"+Thread.currentThread().isInterrupted()); //响应中断 Thread.currentThread().interrupt(); } } System.out.println("运行结束:interrupt Flag"+Thread.currentThread().isInterrupted()); } /* Connected to the target VM, address: '127.0.0.1:53882', transport: 'socket' java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at java.lang.Thread.sleep(Thread.java:340) at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386) at com.yiyuery.sample.interrupt.Test2.run(Test2.java:28) at java.lang.Thread.run(Thread.java:748) Sleep 状态下被中断:interrupt Flagfalse 运行结束:interrupt Flagtrue Disconnected from the target VM, address: '127.0.0.1:53882', transport: 'socket' */
源码
JDK
publicvoidinterrupt(){ if (this != Thread.currentThread()) checkAccess();
synchronized (blockerLock) { Interruptible b = blocker; if (b != null) { interrupt0(); // Just to set the interrupt flag b.interrupt(this); return; } } interrupt0(); } //... privatenativevoidinterrupt0();
// Ensure that the C++ Thread and OSThread structures aren't freed before we operate oop java_thread = JNIHandles::resolve_non_null(jthread); MutexLockerEx ml(thread->threadObj()== java_thread ? NULL : Threads_lock); // We need to re-resolve the java_thread, since a GC might have happened during the // acquire of the lock JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)); if (thr != NULL) { Thread::interrupt(thr); } JVM_END //JVM_Sleep 宏定义 JVM_ENTRY(void, JVM_Sleep(JNIEnv* env, jclass threadClass, jlong millis)) JVMWrapper("JVM_Sleep");
if (millis < 0) { THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative"); }
if (millis == 0) { // When ConvertSleepToYield is on, this matches the classic VM implementation of // JVM_Sleep. Critical for similar threading behaviour (Win32) // It appears that in certain GUI contexts, it may be beneficial to do a short sleep // for SOLARIS if (ConvertSleepToYield) { os::yield(); } else { ThreadState old_state = thread->osthread()->get_state(); thread->osthread()->set_state(SLEEPING); os::sleep(thread, MinSleepInterval, false); thread->osthread()->set_state(old_state); } } else { ThreadState old_state = thread->osthread()->get_state(); thread->osthread()->set_state(SLEEPING); if (os::sleep(thread, millis, true) == OS_INTRPT) { // An asynchronous exception (e.g., ThreadDeathException) could have been thrown on // us while we were sleeping. We do not overwrite those. if (!HAS_PENDING_EXCEPTION) { if (event.should_commit()) { event.set_time(millis); event.commit(); } #ifndef USDT2 HS_DTRACE_PROBE1(hotspot, thread__sleep__end,1); #else/* USDT2 */ HOTSPOT_THREAD_SLEEP_END( 1); #endif/* USDT2 */ // TODO-FIXME: THROW_MSG returns which means we will not call set_state() // to properly restore the thread state. That's likely wrong. THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted"); } } thread->osthread()->set_state(old_state); } if (event.should_commit()) { event.set_time(millis); event.commit(); } #ifndef USDT2 HS_DTRACE_PROBE1(hotspot, thread__sleep__end,0); #else/* USDT2 */ HOTSPOT_THREAD_SLEEP_END( 0); #endif/* USDT2 */ JVM_END
if (!osthread->interrupted()) { //设置一个中断状态 osthread->set_interrupted(true); // More than one thread can get here with the same value of osthread, // resulting in multiple notifications. We do, however, want the store // to interrupted() to be visible to other threads before we execute unpark(). OrderAccess::fence(); //如果是Sleep状态,进行唤醒,并抛出一个InterruptException ParkEvent * const slp = thread->_SleepEvent ; if (slp != NULL) slp->unpark() ; }
// For JSR166. Unpark even if interrupt status already was set if (thread->is_Java_thread()) ((JavaThread*)thread)->parker()->unpark();
ParkEvent * ev = thread->_ParkEvent ; if (ev != NULL) ev->unpark() ;
@Override public String call()throws Exception { TimeUnit.SECONDS.sleep(2); return"T1 use 2 s"; } }; Future<String> r1 = executorService.submit(task1); Callable<String> task2 = new Callable<String>() {
@Override public String call()throws Exception { TimeUnit.SECONDS.sleep(5); return"T2 use 5 s"; } }; Future<String> r2 = executorService.submit(task2); List<String> result = new ArrayList(); result.add(r1.get(3, TimeUnit.SECONDS)); result.add(r2.get(6, TimeUnit.SECONDS)); return result; } } /* Connected to the target VM, address: '127.0.0.1:61608', transport: 'socket' T1 use 2 s,T2 use 5 s Total use 5036 */
CompletableFuture<String> result = new CompletableFuture<>(); executorService.execute(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("T1 use 3s"); result.complete("T1 use 3s "); }); return result; }
privatestatic CompletableFuture<String> doTask2(){ CompletableFuture<String> result = new CompletableFuture<>(); executorService.execute(() -> { try { TimeUnit.SECONDS.sleep(7); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("T1 use 7s"); result.complete("T1 use 7s "); }); return result; }
privatestatic CompletableFuture<String> doTask3(){ CompletableFuture<String> result = new CompletableFuture<>(); executorService.execute(() -> { try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("T3 use 10s"); result.complete("T3 use 10s "); }); return result; } } /* T1 use 3s T2 use 7s T3 use 10s T1 use 3s T2 use 7s T3 use 10s 总耗时 AT 1:10005 T1 use 3s ,T2 use 7s ,T3 use 10s 总耗时 AT 2:10011 */
CompletableFuture<String> result = new CompletableFuture<>(); executorService.execute(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("T1 use 3s"); result.complete("T1 use 3s "); }); return result; }
privatestatic CompletableFuture<String> doTask2(){ CompletableFuture<String> result = new CompletableFuture<>(); executorService.execute(() -> { try { TimeUnit.SECONDS.sleep(7); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("T2 use 7s"); result.complete("T2 use 7s "); }); return result; }
privatestatic CompletableFuture<String> doTask3(){ CompletableFuture<String> result = new CompletableFuture<>(); executorService.execute(() -> { try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("T3 use 10s"); result.complete("T3 use 10s "); }); return result; } } /* 总耗时 AT 1:4 T1 use 3s T2 use 7s t1Result: T1 use 3s t2Result: T2 use 7s 总耗时 AT 3:7005 T3 use 10s t2Result: T2 use 7s t3Result: T3 use 10s 总耗时 AT 2:10004 */
privatestatic CompletableFuture<String> doTask1(){ //注意 T1不再是线程池中异步执行 CompletableFuture<String> result = new CompletableFuture<>(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("T1 use 3s"); result.complete("T1 use 3s "); return result; }
privatestatic CompletableFuture<String> doTask2(){ CompletableFuture<String> result = new CompletableFuture<>(); executorService.execute(() -> { try { TimeUnit.SECONDS.sleep(7); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("T2 use 7s"); result.complete("T2 use 7s "); }); return result; }
privatestatic CompletableFuture<String> doTask3(){ CompletableFuture<String> result = new CompletableFuture<>(); executorService.execute(() -> { try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("T3 use 10s"); result.complete("T3 use 10s "); }); return result; } } /* T1 use 3s 总耗时 AT 1:3063 T2 use 7s T3 use 10s t2Result: T2 use 7s t3Result: T3 use 10s 总耗时 AT 2:13064 */
Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。
Rocky Linux
Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。