Note
Go to the end to download the full example code.
Start Simulation Async#
This example sets up and solves a simulation of particles interacting with a rotating L-shape tube wall using an async call.
Perform imports and create a project#
Perform the required imports and create an empty project.
import os
import tempfile
import time
import matplotlib.pyplot as plt
import numpy as np
import ansys.rocky.core as pyrocky
from ansys.rocky.core import examples
# Create a temp directory to save the project.
project_dir = tempfile.mkdtemp(prefix="pyrocky_")
# Launch Rocky and open a project.
rocky = pyrocky.launch_rocky()
project = rocky.api.CreateProject()
project.SaveProject(os.path.join(project_dir, "rocky-testing.rocky"))
Traceback (most recent call last):
File "C:\Users\ansys\actions-runner\_work\pyrocky\pyrocky\examples\basic_examples\start_simulation_async.py", line 56, in <module>
project = rocky.api.CreateProject()
File "C:\Users\ansys\actions-runner\_work\_tool\Python\3.13.5\x64\Lib\site-packages\Pyro5\client.py", line 510, in __call__
return self.__send(self.__name, args, kwargs)
~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\ansys\actions-runner\_work\_tool\Python\3.13.5\x64\Lib\site-packages\Pyro5\client.py", line 256, in _pyroInvoke
msg = protocol.recv_stub(self._pyroConnection, [protocol.MSG_RESULT])
File "C:\Users\ansys\actions-runner\_work\_tool\Python\3.13.5\x64\Lib\site-packages\Pyro5\protocol.py", line 189, in recv_stub
header = connection.recv(6) # 'PYRO' + 2 bytes protocol version
File "C:\Users\ansys\actions-runner\_work\_tool\Python\3.13.5\x64\Lib\site-packages\Pyro5\socketutil.py", line 435, in recv
return receive_data(self.sock, size)
File "C:\Users\ansys\actions-runner\_work\_tool\Python\3.13.5\x64\Lib\site-packages\Pyro5\socketutil.py", line 166, in receive_data
raise ConnectionClosedError("receiving: connection lost: " + str(x))
Pyro5.errors.ConnectionClosedError: receiving: connection lost: [WinError 10054] An existing connection was forcibly closed by the remote host
Configure the study#
study = project.GetStudy()
# Download the STL file that was imported into Rocky to represent a wall.
file_name = "Lshape_tube.stl"
file_path = examples.download_file(project_dir, file_name, "pyrocky/geometries")
wall = study.ImportWall(file_path)[0]
# Create a particle with the default shape (sphere) and size distribution (single
# distribution with a sieve size of 0.1 m).
particle = study.CreateParticle()
# Create a circular surface to used as the inlet.
circular_surface = study.CreateCircularSurface()
circular_surface.SetMaxRadius(0.8, unit="m")
# Create a rectangular surface to use as the outlet.
rectangular_surface = study.CreateRectangularSurface()
rectangular_surface.SetLength(3, unit="m")
rectangular_surface.SetWidth(3, unit="m")
rectangular_surface.SetCenter((5, -7.5, 0), unit="m")
# Set the inlet and outlet.
particle_inlet = study.CreateParticleInlet(circular_surface, particle)
input_property_list = particle_inlet.GetInputPropertiesList()
input_property_list[0].SetMassFlowRate(1000)
outlet = study.CreateOutlet(rectangular_surface)
# Set the motion rotation over the Y axis and apply it on the wall and the
# rectangular surface used as the outlet.
motion_frame_source = study.GetMotionFrameSource()
motion_frame = motion_frame_source.NewFrame()
motion_frame.AddRotationMotion(angular_velocity=((0.0, 0.5, 0.0), "rad/s"))
motion_frame.ApplyTo(rectangular_surface)
motion_frame.ApplyTo(wall)
# The domain settings define the domain limits where the particles are enabled to be
# computed in the simulation.
domain = study.GetDomainSettings()
domain.DisableUseBoundaryLimits()
domain.SetCoordinateLimitsMaxValues((10, 1, 10), unit="m")
Set up the solver and run the simulation#
solver = study.GetSolver()
simulation_duration = 5
solver.SetSimulationDuration(simulation_duration, unit="s")
# `non_blocking` only available on Rocky 25R1 and onwards.
study.StartSimulation(non_blocking=True)
Postprocess#
Obtain the in and out mass flows of the particles while the simulation is running.
particles = study.GetParticles()
while study.IsSimulating():
# When running an asynchronous simulation, the call to RefreshResults is required
# to ensure that the results are updated.
study.RefreshResults()
times, mass_flow_in = particles.GetNumpyCurve("Particles Mass Flow In", unit="t/h")
times, mass_flow_out = particles.GetNumpyCurve("Particles Mass Flow Out", unit="t/h")
print(f"Simulation Progress: {study.GetProgress():.2f} %")
print(f"\tCurrent mass_flow_in: {mass_flow_in[-1]:.2f} t/h")
print(f"\tCurrent mass_flow_out: {mass_flow_out[-1]:.2f} t/h")
time.sleep(2)
print("Simulation Complete!")
times, mass_flow_in = particles.GetNumpyCurve("Particles Mass Flow In", unit="t/h")
times, mass_flow_out = particles.GetNumpyCurve("Particles Mass Flow Out", unit="t/h")
# Obtain the maximum and minimum velocities of the particles at each time step.
simulation_times = study.GetTimeSet()
velocity_gf = particles.GetGridFunction("Velocity : Translational : Absolute")
velocity_max = np.array(
[velocity_gf.GetMax(unit="m/s", time_step=i) for i in range(len(simulation_times))]
)
velocity_min = np.array(
[velocity_gf.GetMin(unit="m/s", time_step=i) for i in range(len(simulation_times))]
)
Plot curves#
fig, (ax1, ax2) = plt.subplots(2, 1)
ax1.plot(times, mass_flow_in, "b", label="Mass Flow In")
ax1.plot(times, mass_flow_out, "r", label="Mass Flow Out")
ax1.set_xlabel("Time [s]")
ax1.set_ylabel("Mass Flow [t/h]")
ax1.legend(loc="upper left")
ax2.plot(simulation_times, velocity_max, "b", label="Max Velocity")
ax2.plot(simulation_times, velocity_min, "r", label="Min Velocity")
ax2.set_xlabel("Time [s]")
ax2.set_ylabel("Velocity [m/s]")
ax2.legend(loc="upper left")
plt.draw()
rocky.close()
import time
time.sleep(5)
Total running time of the script: (0 minutes 27.481 seconds)